You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2011/12/12 19:59:55 UTC

svn commit: r1213367 - in /incubator/accumulo/trunk: ./ src/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java

Author: ecn
Date: Mon Dec 12 18:59:55 2011
New Revision: 1213367

URL: http://svn.apache.org/viewvc?rev=1213367&view=rev
Log:
ACCUMULO-215: merge to trunk

Modified:
    incubator/accumulo/trunk/   (props changed)
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java

Propchange: incubator/accumulo/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 12 18:59:55 2011
@@ -1,2 +1,2 @@
 /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598
-/incubator/accumulo/branches/1.4:1201902-1212534,1213231,1213235
+/incubator/accumulo/branches/1.4:1201902-1212534,1213231,1213235,1213363

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java?rev=1213367&r1=1213366&r2=1213367&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java Mon Dec 12 18:59:55 2011
@@ -47,7 +47,7 @@ public class ZooTraceClient extends Send
     super(host, service, millis);
     this.path = path;
     this.zoo = zoo;
-    zoo.getChildren(path, this);
+    updateHosts(path, zoo.getChildren(path, this));
   }
   
   @Override

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1213367&r1=1213366&r2=1213367&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java Mon Dec 12 18:59:55 2011
@@ -63,8 +63,10 @@ public class TraceServer implements Watc
   final private static Logger log = Logger.getLogger(TraceServer.class);
   final private AccumuloConfiguration conf;
   final private TServer server;
-  final private BatchWriter writer;
-  
+  private BatchWriter writer = null;
+  private Connector connector;
+  final String table;
+
   private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
     m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len));
   }
@@ -125,6 +127,10 @@ public class TraceServer implements Watc
         put(timeMutation, "id", idString, transport.get(), transport.len());
       }
       try {
+        if (writer == null)
+          resetWriter();
+        if (writer == null)
+          return;
         writer.addMutation(spanMutation);
         writer.addMutation(indexMutation);
         if (timeMutation != null)
@@ -139,8 +145,7 @@ public class TraceServer implements Watc
   public TraceServer(String args[]) throws Exception {
     Accumulo.init("tracer");
     conf = ServerConfiguration.getSystemConfiguration();
-    final String table = conf.get(Property.TRACE_TABLE);
-    Connector connector;
+    table = conf.get(Property.TRACE_TABLE);
     while (true) {
       try {
         connector = HdfsZooInstance.getInstance().getConnector(conf.get(Property.TRACE_USER), conf.get(Property.TRACE_PASSWORD).getBytes());
@@ -183,9 +188,27 @@ public class TraceServer implements Watc
       writer.flush();
     } catch (MutationsRejectedException e) {
       log.error("Error flushing traces", e);
+      resetWriter();
+    }
+  }
+  
+  synchronized private void resetWriter() {
+    try {
+      if (writer != null)
+        writer.close();
+    } catch (Exception ex) {
+      log.error("Error closing batch writer", ex);
+    } finally {
+      writer = null;
+      try {
+        writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10);
+      } catch (Exception ex) {
+        log.error("Unable to create a batch writer: " + ex);
+      }
     }
   }
   
+
   private void registerInZooKeeper(String name) throws Exception {
     String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTRACERS;
     IZooReaderWriter zoo = ZooReaderWriter.getInstance();