You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2016/12/09 21:45:02 UTC

[3/6] accumulo git commit: ACCUMULO-4533 TraceServer shouldn't abort given problems with trace table checks.

ACCUMULO-4533 TraceServer shouldn't abort given problems with trace table checks.

Signed-off-by: Mike Drob <md...@cloudera.com>
Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47b57f73
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47b57f73
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47b57f73

Branch: refs/heads/master
Commit: 47b57f7300ae5d1df9536cf07d99c83bfb8e2af6
Parents: 3c45441
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Dec 7 10:45:51 2016 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Dec 9 15:21:54 2016 -0600

----------------------------------------------------------------------
 .../org/apache/accumulo/tracer/TraceServer.java | 50 ++++++++++++++------
 1 file changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/47b57f73/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index dbb593d..67bd9d5 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -28,12 +28,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
@@ -182,6 +186,35 @@ public class TraceServer implements Watcher {
     log.info("Instance " + serverConfiguration.getInstance().getInstanceID());
     AccumuloConfiguration conf = serverConfiguration.getConfiguration();
     table = conf.get(Property.TRACE_TABLE);
+    connector = ensureTraceTableExists(conf);
+
+    int port = conf.getPort(Property.TRACE_PORT);
+    final ServerSocket sock = ServerSocketChannel.open().socket();
+    sock.setReuseAddress(true);
+    sock.bind(new InetSocketAddress(hostname, port));
+    final TServerTransport transport = new TServerSocket(sock);
+    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+    options.processor(new Processor<Iface>(new Receiver()));
+    server = new TThreadPoolServer(options);
+    registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
+    writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+  }
+
+  /**
+   * Exceptions thrown out of here should be things that cause service failure (e.g. misconfigurations that aren't likely to change on retry).
+   *
+   * @return a working Connection that can be reused
+   * @throws ClassNotFoundException
+   *           if TRACE_TOKEN_TYPE is set to a class that we can't load.
+   * @throws InstantiationException
+   *           if we fail to create an instance of TRACE_TOKEN_TYPE.
+   * @throws IllegalAccessException
+   *           if the class pointed to by TRACE_TOKEN_TYPE is private.
+   * @throws AccumuloSecurityException
+   *           if the trace user has the wrong permissions
+   */
+  private Connector ensureTraceTableExists(final AccumuloConfiguration conf) throws AccumuloSecurityException, ClassNotFoundException, InstantiationException,
+      IllegalAccessException {
     Connector connector = null;
     while (true) {
       try {
@@ -221,25 +254,12 @@ public class TraceServer implements Watcher {
         }
         connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
         break;
-      } catch (RuntimeException ex) {
+      } catch (AccumuloException | TableExistsException | TableNotFoundException | IOException | RuntimeException ex) {
         log.info("Waiting to checking/create the trace table.", ex);
         UtilWaitThread.sleep(1000);
       }
     }
-    this.connector = connector;
-    // make sure we refer to the final variable from now on.
-    connector = null;
-
-    int port = conf.getPort(Property.TRACE_PORT);
-    final ServerSocket sock = ServerSocketChannel.open().socket();
-    sock.setReuseAddress(true);
-    sock.bind(new InetSocketAddress(hostname, port));
-    final TServerTransport transport = new TServerSocket(sock);
-    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
-    options.processor(new Processor<Iface>(new Receiver()));
-    server = new TThreadPoolServer(options);
-    registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
-    writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+    return connector;
   }
 
   public void run() throws Exception {