You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/01/04 21:12:31 UTC

svn commit: r1227296 - in /incubator/accumulo/trunk/src: server/src/main/java/org/apache/accumulo/server/trace/ trace/src/main/java/cloudtrace/instrument/receivers/

Author: ecn
Date: Wed Jan  4 20:12:31 2012
New Revision: 1227296

URL: http://svn.apache.org/viewvc?rev=1227296&view=rev
Log:
ACCUMULO-243: close the connection on write failures; drop spans if there are no tracers (merge to trunk)

Modified:
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
    incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java
    incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java
    incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1227296&r1=1227295&r2=1227296&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java Wed Jan  4 20:12:31 2012
@@ -16,9 +16,7 @@
  */
 package org.apache.accumulo.server.trace;
 
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.channels.ServerSocketChannel;
+import java.net.InetAddress;
 import java.util.TimerTask;
 
 import org.apache.accumulo.core.Constants;
@@ -29,12 +27,13 @@ import org.apache.accumulo.core.conf.Acc
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerPort;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -44,9 +43,6 @@ import org.apache.thrift.TByteArrayOutpu
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.WatchedEvent;
@@ -160,16 +156,10 @@ public class TraceServer implements Watc
     }
     
     int port = conf.getPort(Property.TRACE_PORT);
-    final ServerSocket sock = ServerSocketChannel.open().socket();
-    sock.setReuseAddress(true);
-    sock.bind(new InetSocketAddress(port));
-    final TServerTransport transport = new TServerSocket(sock);
-    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
-    options.processor(new SpanReceiver.Processor(new Receiver()));
-    server = new TThreadPoolServer(options);
-    final InetSocketAddress address = new InetSocketAddress(Accumulo.getLocalAddress(args), sock.getLocalPort());
-    registerInZooKeeper(AddressUtil.toString(address));
-    
+    ServerPort serverPort = TServerUtils.startTServer(port, new SpanReceiver.Processor(new Receiver()), "tracer", "tracer", 4, 1000l);
+    server = serverPort.server;
+    InetAddress address = Accumulo.getLocalAddress(args);
+    registerInZooKeeper(address.getHostAddress() + ":" + serverPort.port);
     writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10);
   }
   

Modified: incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java?rev=1227296&r1=1227295&r2=1227296&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java (original)
+++ incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java Wed Jan  4 20:12:31 2012
@@ -59,7 +59,11 @@ public abstract class AsyncSpanReceiver<
     timer.schedule(new TimerTask() {
       @Override
       public void run() {
-        sendSpans();
+        try {
+          sendSpans();
+        } catch (Exception ex) {
+          log.warn("Exception sending spans to destination", ex);
+        }
       }
       
     }, 0, millis);

Modified: incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java?rev=1227296&r1=1227295&r2=1227296&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java (original)
+++ incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java Wed Jan  4 20:12:31 2012
@@ -43,6 +43,8 @@ public class SendSpansViaThrift extends 
   
   @Override
   protected Client createDestination(String destination) throws Exception {
+    if (destination == null)
+      return null;
     try {
       String[] hostAddr = destination.split(":", 2);
       log.debug("Connecting to " + hostAddr[0] + ":" + hostAddr[1]);
@@ -60,7 +62,14 @@ public class SendSpansViaThrift extends 
   
   @Override
   protected void send(Client client, RemoteSpan s) throws Exception {
-    client.span(s);
+    if (client != null) {
+      try {
+        client.span(s);
+      } catch (Exception ex) {
+        client.getInputProtocol().getTransport().close();
+        client = null;
+      }
+    }
   }
   
   protected String getSpanKey(Map<String,String> data) {

Modified: incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java?rev=1227296&r1=1227295&r2=1227296&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java (original)
+++ incubator/accumulo/trunk/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java Wed Jan  4 20:12:31 2012
@@ -71,6 +71,37 @@ public class ZooSpanClient extends SendS
     zoo.getChildren(path, true);
   }
   
+  /*
+   * (non-Javadoc)
+   * 
+   * @see cloudtrace.instrument.receivers.AsyncSpanReceiver#flush()
+   */
+  @Override
+  public void flush() {
+    if (!hosts.isEmpty())
+      super.flush();
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see cloudtrace.instrument.receivers.AsyncSpanReceiver#sendSpans()
+   */
+  @Override
+  void sendSpans() {
+    if (hosts.isEmpty()) {
+      if (!sendQueue.isEmpty()) {
+        log.error("No hosts to send data to, dropping queued spans");
+        synchronized (sendQueue) {
+          sendQueue.clear();
+          sendQueue.notifyAll();
+        }
+      }
+    } else {
+      super.sendSpans();
+    }
+  }
+
   synchronized private void updateHosts(String path, List<String> children) {
     log.debug("Scanning trace hosts in zookeeper: " + path);
     try {