You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/01/23 08:36:20 UTC

[02/23] git commit: ACCUMULO-2213 Make writer recovery in the Tracer more robust.

ACCUMULO-2213 Make writer recovery in the Tracer more robust.

    * Cleans up writer reseting in the TraceServer, avoids overly broad catching.
    * tones down log levels in TraceServer to WARN because trace information is transient and we retry everything.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dfafd9c1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dfafd9c1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dfafd9c1

Branch: refs/heads/1.5.1-SNAPSHOT
Commit: dfafd9c1104d8359ca1eabe345661c541766d57f
Parents: 57f9b6c
Author: Sean Busbey <bu...@cloudera.com>
Authored: Tue Jan 21 14:05:56 2014 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Wed Jan 22 16:36:34 2014 -0600

----------------------------------------------------------------------
 .../accumulo/server/trace/TraceServer.java      | 68 ++++++++++++++------
 1 file changed, 49 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfafd9c1/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java b/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
index 4d89e9c..0e2010a 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.trace;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.TimerTask;
 
 import org.apache.accumulo.cloudtrace.instrument.Span;
@@ -27,6 +28,7 @@ import org.apache.accumulo.cloudtrace.thrift.SpanReceiver;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
@@ -64,8 +66,8 @@ public class TraceServer implements Watcher {
   final private static Logger log = Logger.getLogger(TraceServer.class);
   final private AccumuloConfiguration conf;
   final private TServer server;
-  private BatchWriter writer = null;
-  private Connector connector;
+  final private AtomicReference<BatchWriter> writer;
+  final private Connector connector;
   final String table;
 
   private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
@@ -128,16 +130,27 @@ public class TraceServer implements Watcher {
         put(timeMutation, "id", idString, transport.get(), transport.len());
       }
       try {
-        if (writer == null)
-          resetWriter();
-        if (writer == null)
+        final BatchWriter writer = TraceServer.this.writer.get();
+        /* Check for null, because we expect spans to come in much faster than flush calls.
+           In the case of failure, we'd rather avoid logging tons of NPEs.
+         */
+        if (null == writer) {
+          log.warn("writer is not ready; discarding span.");
           return;
+        }
         writer.addMutation(spanMutation);
         writer.addMutation(indexMutation);
         if (timeMutation != null)
           writer.addMutation(timeMutation);
-      } catch (Exception ex) {
-        log.error("Unable to write mutation to table: " + spanMutation, ex);
+      } catch (MutationsRejectedException exception) {
+        log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + exception);
+        if (log.isDebugEnabled()) {
+          log.debug("discarded span due to rejection of mutation: " + spanMutation, exception);
+        }
+      /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */
+      } catch (RuntimeException exception) {
+        log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception);
+        log.debug("unable to write mutation to table due to exception.", exception);
       }
     }
     
@@ -147,6 +160,7 @@ public class TraceServer implements Watcher {
     Accumulo.init("tracer");
     conf = ServerConfiguration.getSystemConfiguration();
     table = conf.get(Property.TRACE_TABLE);
+    Connector connector = null;
     while (true) {
       try {
         connector = HdfsZooInstance.getInstance().getConnector(conf.get(Property.TRACE_USER), conf.get(Property.TRACE_PASSWORD).getBytes());
@@ -160,6 +174,9 @@ public class TraceServer implements Watcher {
         UtilWaitThread.sleep(1000);
       }
     }
+    this.connector = connector;
+    // make sure we refer to the final variable from now on.
+    connector = null;
     
     int port = conf.getPort(Property.TRACE_PORT);
     final ServerSocket sock = ServerSocketChannel.open().socket();
@@ -171,8 +188,7 @@ public class TraceServer implements Watcher {
     server = new TThreadPoolServer(options);
     final InetSocketAddress address = new InetSocketAddress(Accumulo.getLocalAddress(args), sock.getLocalPort());
     registerInZooKeeper(AddressUtil.toString(address));
-    
-    writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10);
+    writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10));
   }
   
   public void run() throws Exception {
@@ -187,25 +203,39 @@ public class TraceServer implements Watcher {
   
   private void flush() {
     try {
-      writer.flush();
-    } catch (Exception e) {
-      log.error("Error flushing traces", e);
+      final BatchWriter writer = this.writer.get();
+      if (null != writer) {
+        writer.flush();
+      }
+    } catch (MutationsRejectedException exception) {
+      log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
+      log.debug("flushing traces failed due to exception", exception);
+      resetWriter();
+    /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */
+    } catch (RuntimeException exception) {
+      log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
+      log.debug("flushing traces failed due to exception", exception);
       resetWriter();
     }
   }
   
-  synchronized private void resetWriter() {
+  private void resetWriter() {
+    BatchWriter writer = null;
     try {
-      if (writer != null)
-        writer.close();
+      writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10);
     } catch (Exception ex) {
-      log.error("Error closing batch writer", ex);
+      log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + ex);
+      log.debug("batch writer creation failed with exception.", ex);
     } finally {
-      writer = null;
+      /* Trade in the new writer (even if null) for the one we need to close. */
+      writer = this.writer.getAndSet(writer);
       try {
-        writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10);
+        if (null != writer) {
+          writer.close();
+        }
       } catch (Exception ex) {
-        log.error("Unable to create a batch writer: " + ex);
+        log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex);
+        log.debug("batch writer close failed with exception", ex);
       }
     }
   }