You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2016/12/09 21:45:02 UTC
[3/6] accumulo git commit: ACCUMULO-4533 TraceServer shouldn't abort
given problems with trace table checks.
ACCUMULO-4533 TraceServer shouldn't abort given problems with trace table checks.
Signed-off-by: Mike Drob <md...@cloudera.com>
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47b57f73
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47b57f73
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47b57f73
Branch: refs/heads/master
Commit: 47b57f7300ae5d1df9536cf07d99c83bfb8e2af6
Parents: 3c45441
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Dec 7 10:45:51 2016 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Dec 9 15:21:54 2016 -0600
----------------------------------------------------------------------
.../org/apache/accumulo/tracer/TraceServer.java | 50 ++++++++++++++------
1 file changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/47b57f73/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index dbb593d..67bd9d5 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -28,12 +28,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
@@ -182,6 +186,35 @@ public class TraceServer implements Watcher {
log.info("Instance " + serverConfiguration.getInstance().getInstanceID());
AccumuloConfiguration conf = serverConfiguration.getConfiguration();
table = conf.get(Property.TRACE_TABLE);
+ connector = ensureTraceTableExists(conf);
+
+ int port = conf.getPort(Property.TRACE_PORT);
+ final ServerSocket sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(hostname, port));
+ final TServerTransport transport = new TServerSocket(sock);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.processor(new Processor<Iface>(new Receiver()));
+ server = new TThreadPoolServer(options);
+ registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
+ writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ }
+
+ /**
+ * Exceptions thrown out of here should be things that cause service failure (e.g. misconfigurations that aren't likely to change on retry).
+ *
+ * @return a working Connection that can be reused
+ * @throws ClassNotFoundException
+ * if TRACE_TOKEN_TYPE is set to a class that we can't load.
+ * @throws InstantiationException
+ * if we fail to create an instance of TRACE_TOKEN_TYPE.
+ * @throws IllegalAccessException
+ * if the class pointed to by TRACE_TOKEN_TYPE is private.
+ * @throws AccumuloSecurityException
+ * if the trace user has the wrong permissions
+ */
+ private Connector ensureTraceTableExists(final AccumuloConfiguration conf) throws AccumuloSecurityException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
Connector connector = null;
while (true) {
try {
@@ -221,25 +254,12 @@ public class TraceServer implements Watcher {
}
connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
break;
- } catch (RuntimeException ex) {
+ } catch (AccumuloException | TableExistsException | TableNotFoundException | IOException | RuntimeException ex) {
log.info("Waiting to checking/create the trace table.", ex);
UtilWaitThread.sleep(1000);
}
}
- this.connector = connector;
- // make sure we refer to the final variable from now on.
- connector = null;
-
- int port = conf.getPort(Property.TRACE_PORT);
- final ServerSocket sock = ServerSocketChannel.open().socket();
- sock.setReuseAddress(true);
- sock.bind(new InetSocketAddress(hostname, port));
- final TServerTransport transport = new TServerSocket(sock);
- TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
- options.processor(new Processor<Iface>(new Receiver()));
- server = new TThreadPoolServer(options);
- registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
- writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ return connector;
}
public void run() throws Exception {