You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2016/12/09 21:45:00 UTC
[1/6] accumulo git commit: ACCUMULO-4533 TraceServer shouldn't abort
given problems with trace table checks.
Repository: accumulo
Updated Branches:
refs/heads/1.7 3c4544144 -> 47b57f730
refs/heads/1.8 7c7bbabb8 -> 037c1384a
refs/heads/master 8e0f19a1c -> 3dc9b373d
ACCUMULO-4533 TraceServer shouldn't abort given problems with trace table checks.
Signed-off-by: Mike Drob <md...@cloudera.com>
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47b57f73
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47b57f73
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47b57f73
Branch: refs/heads/1.7
Commit: 47b57f7300ae5d1df9536cf07d99c83bfb8e2af6
Parents: 3c45441
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Dec 7 10:45:51 2016 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Dec 9 15:21:54 2016 -0600
----------------------------------------------------------------------
.../org/apache/accumulo/tracer/TraceServer.java | 50 ++++++++++++++------
1 file changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/47b57f73/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index dbb593d..67bd9d5 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -28,12 +28,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
@@ -182,6 +186,35 @@ public class TraceServer implements Watcher {
log.info("Instance " + serverConfiguration.getInstance().getInstanceID());
AccumuloConfiguration conf = serverConfiguration.getConfiguration();
table = conf.get(Property.TRACE_TABLE);
+ connector = ensureTraceTableExists(conf);
+
+ int port = conf.getPort(Property.TRACE_PORT);
+ final ServerSocket sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(hostname, port));
+ final TServerTransport transport = new TServerSocket(sock);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.processor(new Processor<Iface>(new Receiver()));
+ server = new TThreadPoolServer(options);
+ registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
+ writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ }
+
+ /**
+ * Exceptions thrown out of here should be things that cause service failure (e.g. misconfigurations that aren't likely to change on retry).
+ *
+ * @return a working Connection that can be reused
+ * @throws ClassNotFoundException
+ * if TRACE_TOKEN_TYPE is set to a class that we can't load.
+ * @throws InstantiationException
+ * if we fail to create an instance of TRACE_TOKEN_TYPE.
+ * @throws IllegalAccessException
+ * if the class pointed to by TRACE_TOKEN_TYPE is private.
+ * @throws AccumuloSecurityException
+ * if the trace user has the wrong permissions
+ */
+ private Connector ensureTraceTableExists(final AccumuloConfiguration conf) throws AccumuloSecurityException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
Connector connector = null;
while (true) {
try {
@@ -221,25 +254,12 @@ public class TraceServer implements Watcher {
}
connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
break;
- } catch (RuntimeException ex) {
+ } catch (AccumuloException | TableExistsException | TableNotFoundException | IOException | RuntimeException ex) {
log.info("Waiting to checking/create the trace table.", ex);
UtilWaitThread.sleep(1000);
}
}
- this.connector = connector;
- // make sure we refer to the final variable from now on.
- connector = null;
-
- int port = conf.getPort(Property.TRACE_PORT);
- final ServerSocket sock = ServerSocketChannel.open().socket();
- sock.setReuseAddress(true);
- sock.bind(new InetSocketAddress(hostname, port));
- final TServerTransport transport = new TServerSocket(sock);
- TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
- options.processor(new Processor<Iface>(new Receiver()));
- server = new TThreadPoolServer(options);
- registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
- writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ return connector;
}
public void run() throws Exception {
[6/6] accumulo git commit: Merge branch '1.8'
Posted by bu...@apache.org.
Merge branch '1.8'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3dc9b373
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3dc9b373
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3dc9b373
Branch: refs/heads/master
Commit: 3dc9b373d0fb9a78efefefc640e0e395e42ce1c7
Parents: 8e0f19a 037c138
Author: Sean Busbey <bu...@cloudera.com>
Authored: Fri Dec 9 15:39:14 2016 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Dec 9 15:39:14 2016 -0600
----------------------------------------------------------------------
.../org/apache/accumulo/tracer/TraceServer.java | 76 ++++++++++++--------
1 file changed, 48 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
[2/6] accumulo git commit: ACCUMULO-4533 TraceServer shouldn't abort
given problems with trace table checks.
Posted by bu...@apache.org.
ACCUMULO-4533 TraceServer shouldn't abort given problems with trace table checks.
Signed-off-by: Mike Drob <md...@cloudera.com>
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47b57f73
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47b57f73
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47b57f73
Branch: refs/heads/1.8
Commit: 47b57f7300ae5d1df9536cf07d99c83bfb8e2af6
Parents: 3c45441
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Dec 7 10:45:51 2016 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Dec 9 15:21:54 2016 -0600
----------------------------------------------------------------------
.../org/apache/accumulo/tracer/TraceServer.java | 50 ++++++++++++++------
1 file changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/47b57f73/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index dbb593d..67bd9d5 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -28,12 +28,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
@@ -182,6 +186,35 @@ public class TraceServer implements Watcher {
log.info("Instance " + serverConfiguration.getInstance().getInstanceID());
AccumuloConfiguration conf = serverConfiguration.getConfiguration();
table = conf.get(Property.TRACE_TABLE);
+ connector = ensureTraceTableExists(conf);
+
+ int port = conf.getPort(Property.TRACE_PORT);
+ final ServerSocket sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(hostname, port));
+ final TServerTransport transport = new TServerSocket(sock);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.processor(new Processor<Iface>(new Receiver()));
+ server = new TThreadPoolServer(options);
+ registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
+ writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ }
+
+ /**
+ * Exceptions thrown out of here should be things that cause service failure (e.g. misconfigurations that aren't likely to change on retry).
+ *
+ * @return a working Connection that can be reused
+ * @throws ClassNotFoundException
+ * if TRACE_TOKEN_TYPE is set to a class that we can't load.
+ * @throws InstantiationException
+ * if we fail to create an instance of TRACE_TOKEN_TYPE.
+ * @throws IllegalAccessException
+ * if the class pointed to by TRACE_TOKEN_TYPE is private.
+ * @throws AccumuloSecurityException
+ * if the trace user has the wrong permissions
+ */
+ private Connector ensureTraceTableExists(final AccumuloConfiguration conf) throws AccumuloSecurityException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
Connector connector = null;
while (true) {
try {
@@ -221,25 +254,12 @@ public class TraceServer implements Watcher {
}
connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
break;
- } catch (RuntimeException ex) {
+ } catch (AccumuloException | TableExistsException | TableNotFoundException | IOException | RuntimeException ex) {
log.info("Waiting to checking/create the trace table.", ex);
UtilWaitThread.sleep(1000);
}
}
- this.connector = connector;
- // make sure we refer to the final variable from now on.
- connector = null;
-
- int port = conf.getPort(Property.TRACE_PORT);
- final ServerSocket sock = ServerSocketChannel.open().socket();
- sock.setReuseAddress(true);
- sock.bind(new InetSocketAddress(hostname, port));
- final TServerTransport transport = new TServerSocket(sock);
- TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
- options.processor(new Processor<Iface>(new Receiver()));
- server = new TThreadPoolServer(options);
- registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
- writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ return connector;
}
public void run() throws Exception {
[5/6] accumulo git commit: Merge branch '1.7' into 1.8
Posted by bu...@apache.org.
Merge branch '1.7' into 1.8
Conflicts:
server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/037c1384
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/037c1384
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/037c1384
Branch: refs/heads/1.8
Commit: 037c1384ac861be95d8c9d1c5d1acbf61e40e2fe
Parents: 7c7bbab 47b57f7
Author: Sean Busbey <bu...@cloudera.com>
Authored: Fri Dec 9 15:33:37 2016 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Dec 9 15:33:37 2016 -0600
----------------------------------------------------------------------
.../org/apache/accumulo/tracer/TraceServer.java | 76 ++++++++++++--------
1 file changed, 48 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/037c1384/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 2b6bcbf,67bd9d5..7c0d9b2
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@@ -183,7 -185,36 +187,49 @@@ public class TraceServer implements Wat
log.info("Version " + Constants.VERSION);
log.info("Instance " + serverConfiguration.getInstance().getInstanceID());
AccumuloConfiguration conf = serverConfiguration.getConfiguration();
- table = conf.get(Property.TRACE_TABLE);
+ tableName = conf.get(Property.TRACE_TABLE);
+ connector = ensureTraceTableExists(conf);
+
- int port = conf.getPort(Property.TRACE_PORT);
- final ServerSocket sock = ServerSocketChannel.open().socket();
- sock.setReuseAddress(true);
- sock.bind(new InetSocketAddress(hostname, port));
++ int ports[] = conf.getPort(Property.TRACE_PORT);
++ ServerSocket sock = null;
++ for (int port : ports) {
++ ServerSocket s = ServerSocketChannel.open().socket();
++ s.setReuseAddress(true);
++ try {
++ s.bind(new InetSocketAddress(hostname, port));
++ sock = s;
++ break;
++ } catch (Exception e) {
++ log.warn("Unable to start trace server on port {}", port);
++ }
++ }
++ if (null == sock) {
++ throw new RuntimeException("Unable to start trace server on configured ports: " + Arrays.toString(ports));
++ }
+ final TServerTransport transport = new TServerSocket(sock);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.processor(new Processor<Iface>(new Receiver()));
+ server = new TThreadPoolServer(options);
+ registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
- writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
++ writer = new AtomicReference<>(this.connector.createBatchWriter(tableName,
++ new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ }
+
+ /**
+ * Exceptions thrown out of here should be things that cause service failure (e.g. misconfigurations that aren't likely to change on retry).
+ *
+ * @return a working Connection that can be reused
+ * @throws ClassNotFoundException
+ * if TRACE_TOKEN_TYPE is set to a class that we can't load.
+ * @throws InstantiationException
+ * if we fail to create an instance of TRACE_TOKEN_TYPE.
+ * @throws IllegalAccessException
+ * if the class pointed to by TRACE_TOKEN_TYPE is private.
+ * @throws AccumuloSecurityException
+ * if the trace user has the wrong permissions
+ */
+ private Connector ensureTraceTableExists(final AccumuloConfiguration conf) throws AccumuloSecurityException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
Connector connector = null;
while (true) {
try {
@@@ -215,46 -246,20 +261,20 @@@
}
connector = serverConfiguration.getInstance().getConnector(principal, at);
- if (!connector.tableOperations().exists(table)) {
- connector.tableOperations().create(table);
+ if (!connector.tableOperations().exists(tableName)) {
+ connector.tableOperations().create(tableName);
IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
- connector.tableOperations().attachIterator(table, setting);
+ connector.tableOperations().attachIterator(tableName, setting);
}
- connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
+ connector.tableOperations().setProperty(tableName, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
break;
- } catch (RuntimeException ex) {
+ } catch (AccumuloException | TableExistsException | TableNotFoundException | IOException | RuntimeException ex) {
log.info("Waiting to checking/create the trace table.", ex);
- UtilWaitThread.sleep(1000);
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
- this.connector = connector;
- // make sure we refer to the final variable from now on.
- connector = null;
-
- int ports[] = conf.getPort(Property.TRACE_PORT);
- ServerSocket sock = null;
- for (int port : ports) {
- ServerSocket s = ServerSocketChannel.open().socket();
- s.setReuseAddress(true);
- try {
- s.bind(new InetSocketAddress(hostname, port));
- sock = s;
- break;
- } catch (Exception e) {
- log.warn("Unable to start trace server on port {}", port);
- }
- }
- if (null == sock) {
- throw new RuntimeException("Unable to start trace server on configured ports: " + Arrays.toString(ports));
- }
- final TServerTransport transport = new TServerSocket(sock);
- TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
- options.processor(new Processor<Iface>(new Receiver()));
- server = new TThreadPoolServer(options);
- registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
- writer = new AtomicReference<>(this.connector.createBatchWriter(tableName,
- new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ return connector;
}
public void run() throws Exception {
[3/6] accumulo git commit: ACCUMULO-4533 TraceServer shouldn't abort
given problems with trace table checks.
Posted by bu...@apache.org.
ACCUMULO-4533 TraceServer shouldn't abort given problems with trace table checks.
Signed-off-by: Mike Drob <md...@cloudera.com>
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47b57f73
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47b57f73
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47b57f73
Branch: refs/heads/master
Commit: 47b57f7300ae5d1df9536cf07d99c83bfb8e2af6
Parents: 3c45441
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Dec 7 10:45:51 2016 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Dec 9 15:21:54 2016 -0600
----------------------------------------------------------------------
.../org/apache/accumulo/tracer/TraceServer.java | 50 ++++++++++++++------
1 file changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/47b57f73/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index dbb593d..67bd9d5 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -28,12 +28,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
@@ -182,6 +186,35 @@ public class TraceServer implements Watcher {
log.info("Instance " + serverConfiguration.getInstance().getInstanceID());
AccumuloConfiguration conf = serverConfiguration.getConfiguration();
table = conf.get(Property.TRACE_TABLE);
+ connector = ensureTraceTableExists(conf);
+
+ int port = conf.getPort(Property.TRACE_PORT);
+ final ServerSocket sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(hostname, port));
+ final TServerTransport transport = new TServerSocket(sock);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.processor(new Processor<Iface>(new Receiver()));
+ server = new TThreadPoolServer(options);
+ registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
+ writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ }
+
+ /**
+ * Exceptions thrown out of here should be things that cause service failure (e.g. misconfigurations that aren't likely to change on retry).
+ *
+ * @return a working Connection that can be reused
+ * @throws ClassNotFoundException
+ * if TRACE_TOKEN_TYPE is set to a class that we can't load.
+ * @throws InstantiationException
+ * if we fail to create an instance of TRACE_TOKEN_TYPE.
+ * @throws IllegalAccessException
+ * if the class pointed to by TRACE_TOKEN_TYPE is private.
+ * @throws AccumuloSecurityException
+ * if the trace user has the wrong permissions
+ */
+ private Connector ensureTraceTableExists(final AccumuloConfiguration conf) throws AccumuloSecurityException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
Connector connector = null;
while (true) {
try {
@@ -221,25 +254,12 @@ public class TraceServer implements Watcher {
}
connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
break;
- } catch (RuntimeException ex) {
+ } catch (AccumuloException | TableExistsException | TableNotFoundException | IOException | RuntimeException ex) {
log.info("Waiting to checking/create the trace table.", ex);
UtilWaitThread.sleep(1000);
}
}
- this.connector = connector;
- // make sure we refer to the final variable from now on.
- connector = null;
-
- int port = conf.getPort(Property.TRACE_PORT);
- final ServerSocket sock = ServerSocketChannel.open().socket();
- sock.setReuseAddress(true);
- sock.bind(new InetSocketAddress(hostname, port));
- final TServerTransport transport = new TServerSocket(sock);
- TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
- options.processor(new Processor<Iface>(new Receiver()));
- server = new TThreadPoolServer(options);
- registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
- writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ return connector;
}
public void run() throws Exception {
[4/6] accumulo git commit: Merge branch '1.7' into 1.8
Posted by bu...@apache.org.
Merge branch '1.7' into 1.8
Conflicts:
server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/037c1384
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/037c1384
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/037c1384
Branch: refs/heads/master
Commit: 037c1384ac861be95d8c9d1c5d1acbf61e40e2fe
Parents: 7c7bbab 47b57f7
Author: Sean Busbey <bu...@cloudera.com>
Authored: Fri Dec 9 15:33:37 2016 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Dec 9 15:33:37 2016 -0600
----------------------------------------------------------------------
.../org/apache/accumulo/tracer/TraceServer.java | 76 ++++++++++++--------
1 file changed, 48 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/037c1384/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 2b6bcbf,67bd9d5..7c0d9b2
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@@ -183,7 -185,36 +187,49 @@@ public class TraceServer implements Wat
log.info("Version " + Constants.VERSION);
log.info("Instance " + serverConfiguration.getInstance().getInstanceID());
AccumuloConfiguration conf = serverConfiguration.getConfiguration();
- table = conf.get(Property.TRACE_TABLE);
+ tableName = conf.get(Property.TRACE_TABLE);
+ connector = ensureTraceTableExists(conf);
+
- int port = conf.getPort(Property.TRACE_PORT);
- final ServerSocket sock = ServerSocketChannel.open().socket();
- sock.setReuseAddress(true);
- sock.bind(new InetSocketAddress(hostname, port));
++ int ports[] = conf.getPort(Property.TRACE_PORT);
++ ServerSocket sock = null;
++ for (int port : ports) {
++ ServerSocket s = ServerSocketChannel.open().socket();
++ s.setReuseAddress(true);
++ try {
++ s.bind(new InetSocketAddress(hostname, port));
++ sock = s;
++ break;
++ } catch (Exception e) {
++ log.warn("Unable to start trace server on port {}", port);
++ }
++ }
++ if (null == sock) {
++ throw new RuntimeException("Unable to start trace server on configured ports: " + Arrays.toString(ports));
++ }
+ final TServerTransport transport = new TServerSocket(sock);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.processor(new Processor<Iface>(new Receiver()));
+ server = new TThreadPoolServer(options);
+ registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
- writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
++ writer = new AtomicReference<>(this.connector.createBatchWriter(tableName,
++ new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ }
+
+ /**
+ * Exceptions thrown out of here should be things that cause service failure (e.g. misconfigurations that aren't likely to change on retry).
+ *
+ * @return a working Connection that can be reused
+ * @throws ClassNotFoundException
+ * if TRACE_TOKEN_TYPE is set to a class that we can't load.
+ * @throws InstantiationException
+ * if we fail to create an instance of TRACE_TOKEN_TYPE.
+ * @throws IllegalAccessException
+ * if the class pointed to by TRACE_TOKEN_TYPE is private.
+ * @throws AccumuloSecurityException
+ * if the trace user has the wrong permissions
+ */
+ private Connector ensureTraceTableExists(final AccumuloConfiguration conf) throws AccumuloSecurityException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
Connector connector = null;
while (true) {
try {
@@@ -215,46 -246,20 +261,20 @@@
}
connector = serverConfiguration.getInstance().getConnector(principal, at);
- if (!connector.tableOperations().exists(table)) {
- connector.tableOperations().create(table);
+ if (!connector.tableOperations().exists(tableName)) {
+ connector.tableOperations().create(tableName);
IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
- connector.tableOperations().attachIterator(table, setting);
+ connector.tableOperations().attachIterator(tableName, setting);
}
- connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
+ connector.tableOperations().setProperty(tableName, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
break;
- } catch (RuntimeException ex) {
+ } catch (AccumuloException | TableExistsException | TableNotFoundException | IOException | RuntimeException ex) {
log.info("Waiting to checking/create the trace table.", ex);
- UtilWaitThread.sleep(1000);
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
- this.connector = connector;
- // make sure we refer to the final variable from now on.
- connector = null;
-
- int ports[] = conf.getPort(Property.TRACE_PORT);
- ServerSocket sock = null;
- for (int port : ports) {
- ServerSocket s = ServerSocketChannel.open().socket();
- s.setReuseAddress(true);
- try {
- s.bind(new InetSocketAddress(hostname, port));
- sock = s;
- break;
- } catch (Exception e) {
- log.warn("Unable to start trace server on port {}", port);
- }
- }
- if (null == sock) {
- throw new RuntimeException("Unable to start trace server on configured ports: " + Arrays.toString(ports));
- }
- final TServerTransport transport = new TServerSocket(sock);
- TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
- options.processor(new Processor<Iface>(new Receiver()));
- server = new TThreadPoolServer(options);
- registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH));
- writer = new AtomicReference<>(this.connector.createBatchWriter(tableName,
- new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS)));
+ return connector;
}
public void run() throws Exception {