You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/01/23 08:36:22 UTC
[04/23] git commit: ACCUMULO-2213 Make writer recovery in the Tracer
more robust.
ACCUMULO-2213 Make writer recovery in the Tracer more robust.
* Cleans up writer reseting in the TraceServer, avoids overly broad catching.
* tones down log levels in TraceServer to WARN because trace information is transient and we retry everything.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dfafd9c1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dfafd9c1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dfafd9c1
Branch: refs/heads/master
Commit: dfafd9c1104d8359ca1eabe345661c541766d57f
Parents: 57f9b6c
Author: Sean Busbey <bu...@cloudera.com>
Authored: Tue Jan 21 14:05:56 2014 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Wed Jan 22 16:36:34 2014 -0600
----------------------------------------------------------------------
.../accumulo/server/trace/TraceServer.java | 68 ++++++++++++++------
1 file changed, 49 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfafd9c1/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java b/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
index 4d89e9c..0e2010a 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.trace;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.TimerTask;
import org.apache.accumulo.cloudtrace.instrument.Span;
@@ -27,6 +28,7 @@ import org.apache.accumulo.cloudtrace.thrift.SpanReceiver;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
@@ -64,8 +66,8 @@ public class TraceServer implements Watcher {
final private static Logger log = Logger.getLogger(TraceServer.class);
final private AccumuloConfiguration conf;
final private TServer server;
- private BatchWriter writer = null;
- private Connector connector;
+ final private AtomicReference<BatchWriter> writer;
+ final private Connector connector;
final String table;
private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
@@ -128,16 +130,27 @@ public class TraceServer implements Watcher {
put(timeMutation, "id", idString, transport.get(), transport.len());
}
try {
- if (writer == null)
- resetWriter();
- if (writer == null)
+ final BatchWriter writer = TraceServer.this.writer.get();
+ /* Check for null, because we expect spans to come in much faster than flush calls.
+ In the case of failure, we'd rather avoid logging tons of NPEs.
+ */
+ if (null == writer) {
+ log.warn("writer is not ready; discarding span.");
return;
+ }
writer.addMutation(spanMutation);
writer.addMutation(indexMutation);
if (timeMutation != null)
writer.addMutation(timeMutation);
- } catch (Exception ex) {
- log.error("Unable to write mutation to table: " + spanMutation, ex);
+ } catch (MutationsRejectedException exception) {
+ log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + exception);
+ if (log.isDebugEnabled()) {
+ log.debug("discarded span due to rejection of mutation: " + spanMutation, exception);
+ }
+ /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */
+ } catch (RuntimeException exception) {
+ log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception);
+ log.debug("unable to write mutation to table due to exception.", exception);
}
}
@@ -147,6 +160,7 @@ public class TraceServer implements Watcher {
Accumulo.init("tracer");
conf = ServerConfiguration.getSystemConfiguration();
table = conf.get(Property.TRACE_TABLE);
+ Connector connector = null;
while (true) {
try {
connector = HdfsZooInstance.getInstance().getConnector(conf.get(Property.TRACE_USER), conf.get(Property.TRACE_PASSWORD).getBytes());
@@ -160,6 +174,9 @@ public class TraceServer implements Watcher {
UtilWaitThread.sleep(1000);
}
}
+ this.connector = connector;
+ // make sure we refer to the final variable from now on.
+ connector = null;
int port = conf.getPort(Property.TRACE_PORT);
final ServerSocket sock = ServerSocketChannel.open().socket();
@@ -171,8 +188,7 @@ public class TraceServer implements Watcher {
server = new TThreadPoolServer(options);
final InetSocketAddress address = new InetSocketAddress(Accumulo.getLocalAddress(args), sock.getLocalPort());
registerInZooKeeper(AddressUtil.toString(address));
-
- writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10);
+ writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10));
}
public void run() throws Exception {
@@ -187,25 +203,39 @@ public class TraceServer implements Watcher {
private void flush() {
try {
- writer.flush();
- } catch (Exception e) {
- log.error("Error flushing traces", e);
+ final BatchWriter writer = this.writer.get();
+ if (null != writer) {
+ writer.flush();
+ }
+ } catch (MutationsRejectedException exception) {
+ log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
+ log.debug("flushing traces failed due to exception", exception);
+ resetWriter();
+ /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */
+ } catch (RuntimeException exception) {
+ log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
+ log.debug("flushing traces failed due to exception", exception);
resetWriter();
}
}
- synchronized private void resetWriter() {
+ private void resetWriter() {
+ BatchWriter writer = null;
try {
- if (writer != null)
- writer.close();
+ writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10);
} catch (Exception ex) {
- log.error("Error closing batch writer", ex);
+ log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + ex);
+ log.debug("batch writer creation failed with exception.", ex);
} finally {
- writer = null;
+ /* Trade in the new writer (even if null) for the one we need to close. */
+ writer = this.writer.getAndSet(writer);
try {
- writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10);
+ if (null != writer) {
+ writer.close();
+ }
} catch (Exception ex) {
- log.error("Unable to create a batch writer: " + ex);
+ log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex);
+ log.debug("batch writer close failed with exception", ex);
}
}
}