You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2016/12/09 21:45:04 UTC

[5/6] accumulo git commit: Merge branch '1.7' into 1.8

Merge branch '1.7' into 1.8

 Conflicts:
	server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/037c1384
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/037c1384
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/037c1384

Branch: refs/heads/1.8
Commit: 037c1384ac861be95d8c9d1c5d1acbf61e40e2fe
Parents: 7c7bbab 47b57f7
Author: Sean Busbey <bu...@cloudera.com>
Authored: Fri Dec 9 15:33:37 2016 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Dec 9 15:33:37 2016 -0600

----------------------------------------------------------------------
 .../org/apache/accumulo/tracer/TraceServer.java | 76 ++++++++++++--------
 1 file changed, 48 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/037c1384/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 2b6bcbf,67bd9d5..7c0d9b2
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@@ -183,7 -185,36 +187,49 @@@ public class TraceServer implements Wat
      log.info("Version " + Constants.VERSION);
      log.info("Instance " + serverConfiguration.getInstance().getInstanceID());
      AccumuloConfiguration conf = serverConfiguration.getConfiguration();
 -    table = conf.get(Property.TRACE_TABLE);
 +    tableName = conf.get(Property.TRACE_TABLE);
+     connector = ensureTraceTableExists(conf);
+ 
 -    int port = conf.getPort(Property.TRACE_PORT);
 -    final ServerSocket sock = ServerSocketChannel.open().socket();
 -    sock.setReuseAddress(true);
 -    sock.bind(new InetSocketAddress(hostname, port));
++    int ports[] = conf.getPort(Property.TRACE_PORT);
++    ServerSocket sock = null;
++    for (int port : ports) {
++      ServerSocket s = ServerSocketChannel.open().socket();
++      s.setReuseAddress(true);
++      try {
++        s.bind(new InetSocketAddress(hostname, port));
++        sock = s;
++        break;
++      } catch (Exception e) {
++        log.warn("Unable to start trace server on port {}", port);
++      }
++    }
++    if (null == sock) {
++      throw new RuntimeException("Unable to start trace server on configured ports: " + Arrays.toString(ports));
++    }
+     final TServerTransport transport = new TServerSocket(sock);
+     TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+     options.processor(new Processor<Iface>(new Receiver()));
+     server = new TThreadPoolServer(options);
+     registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
 -    writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
++    writer = new AtomicReference<>(this.connector.createBatchWriter(tableName,
++        new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+   }
+ 
+   /**
+    * Exceptions thrown out of here should be things that cause service failure (e.g. misconfigurations that aren't likely to change on retry).
+    *
+    * @return a working Connection that can be reused
+    * @throws ClassNotFoundException
+    *           if TRACE_TOKEN_TYPE is set to a class that we can't load.
+    * @throws InstantiationException
+    *           if we fail to create an instance of TRACE_TOKEN_TYPE.
+    * @throws IllegalAccessException
+    *           if the class pointed to by TRACE_TOKEN_TYPE is private.
+    * @throws AccumuloSecurityException
+    *           if the trace user has the wrong permissions
+    */
+   private Connector ensureTraceTableExists(final AccumuloConfiguration conf) throws AccumuloSecurityException, ClassNotFoundException, InstantiationException,
+       IllegalAccessException {
      Connector connector = null;
      while (true) {
        try {
@@@ -215,46 -246,20 +261,20 @@@
          }
  
          connector = serverConfiguration.getInstance().getConnector(principal, at);
 -        if (!connector.tableOperations().exists(table)) {
 -          connector.tableOperations().create(table);
 +        if (!connector.tableOperations().exists(tableName)) {
 +          connector.tableOperations().create(tableName);
            IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
            AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
 -          connector.tableOperations().attachIterator(table, setting);
 +          connector.tableOperations().attachIterator(tableName, setting);
          }
 -        connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
 +        connector.tableOperations().setProperty(tableName, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
          break;
-       } catch (RuntimeException ex) {
+       } catch (AccumuloException | TableExistsException | TableNotFoundException | IOException | RuntimeException ex) {
          log.info("Waiting to checking/create the trace table.", ex);
 -        UtilWaitThread.sleep(1000);
 +        sleepUninterruptibly(1, TimeUnit.SECONDS);
        }
      }
-     this.connector = connector;
-     // make sure we refer to the final variable from now on.
-     connector = null;
- 
-     int ports[] = conf.getPort(Property.TRACE_PORT);
-     ServerSocket sock = null;
-     for (int port : ports) {
-       ServerSocket s = ServerSocketChannel.open().socket();
-       s.setReuseAddress(true);
-       try {
-         s.bind(new InetSocketAddress(hostname, port));
-         sock = s;
-         break;
-       } catch (Exception e) {
-         log.warn("Unable to start trace server on port {}", port);
-       }
-     }
-     if (null == sock) {
-       throw new RuntimeException("Unable to start trace server on configured ports: " + Arrays.toString(ports));
-     }
-     final TServerTransport transport = new TServerSocket(sock);
-     TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
-     options.processor(new Processor<Iface>(new Receiver()));
-     server = new TThreadPoolServer(options);
-     registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
-     writer = new AtomicReference<>(this.connector.createBatchWriter(tableName,
-         new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+     return connector;
    }
  
    public void run() throws Exception {