You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/01/04 20:58:24 UTC
svn commit: r1227289 - in /incubator/accumulo/branches/1.4/src:
server/src/main/java/org/apache/accumulo/server/trace/
trace/src/main/java/cloudtrace/instrument/receivers/
Author: ecn
Date: Wed Jan 4 19:58:23 2012
New Revision: 1227289
URL: http://svn.apache.org/viewvc?rev=1227289&view=rev
Log:
ACCUMULO-243: close the connection on write failures; drop spans if there are no tracers (merge to 1.4 branch)
Modified:
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java
incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java
incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1227289&r1=1227288&r2=1227289&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java Wed Jan 4 19:58:23 2012
@@ -16,9 +16,7 @@
*/
package org.apache.accumulo.server.trace;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.channels.ServerSocketChannel;
+import java.net.InetAddress;
import java.util.TimerTask;
import org.apache.accumulo.core.Constants;
@@ -29,12 +27,13 @@ import org.apache.accumulo.core.conf.Acc
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerPort;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -44,9 +43,6 @@ import org.apache.thrift.TByteArrayOutpu
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.WatchedEvent;
@@ -160,16 +156,10 @@ public class TraceServer implements Watc
}
int port = conf.getPort(Property.TRACE_PORT);
- final ServerSocket sock = ServerSocketChannel.open().socket();
- sock.setReuseAddress(true);
- sock.bind(new InetSocketAddress(port));
- final TServerTransport transport = new TServerSocket(sock);
- TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
- options.processor(new SpanReceiver.Processor(new Receiver()));
- server = new TThreadPoolServer(options);
- final InetSocketAddress address = new InetSocketAddress(Accumulo.getLocalAddress(args), sock.getLocalPort());
- registerInZooKeeper(AddressUtil.toString(address));
-
+ ServerPort serverPort = TServerUtils.startTServer(port, new SpanReceiver.Processor(new Receiver()), "tracer", "tracer", 4, 1000l);
+ server = serverPort.server;
+ InetAddress address = Accumulo.getLocalAddress(args);
+ registerInZooKeeper(address.getHostAddress() + ":" + serverPort.port);
writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l, 10);
}
Modified: incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java?rev=1227289&r1=1227288&r2=1227289&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java (original)
+++ incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/AsyncSpanReceiver.java Wed Jan 4 19:58:23 2012
@@ -59,7 +59,11 @@ public abstract class AsyncSpanReceiver<
timer.schedule(new TimerTask() {
@Override
public void run() {
- sendSpans();
+ try {
+ sendSpans();
+ } catch (Exception ex) {
+ log.warn("Exception sending spans to destination", ex);
+ }
}
}, 0, millis);
Modified: incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java?rev=1227289&r1=1227288&r2=1227289&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java (original)
+++ incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/SendSpansViaThrift.java Wed Jan 4 19:58:23 2012
@@ -43,6 +43,8 @@ public class SendSpansViaThrift extends
@Override
protected Client createDestination(String destination) throws Exception {
+ if (destination == null)
+ return null;
try {
String[] hostAddr = destination.split(":", 2);
log.debug("Connecting to " + hostAddr[0] + ":" + hostAddr[1]);
@@ -60,7 +62,14 @@ public class SendSpansViaThrift extends
@Override
protected void send(Client client, RemoteSpan s) throws Exception {
- client.span(s);
+ if (client != null) {
+ try {
+ client.span(s);
+ } catch (Exception ex) {
+ client.getInputProtocol().getTransport().close();
+ client = null;
+ }
+ }
}
protected String getSpanKey(Map<String,String> data) {
Modified: incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java?rev=1227289&r1=1227288&r2=1227289&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java (original)
+++ incubator/accumulo/branches/1.4/src/trace/src/main/java/cloudtrace/instrument/receivers/ZooSpanClient.java Wed Jan 4 19:58:23 2012
@@ -72,6 +72,37 @@ public class ZooSpanClient extends SendS
zoo.getChildren(path, true);
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see cloudtrace.instrument.receivers.AsyncSpanReceiver#flush()
+ */
+ @Override
+ public void flush() {
+ if (!hosts.isEmpty())
+ super.flush();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see cloudtrace.instrument.receivers.AsyncSpanReceiver#sendSpans()
+ */
+ @Override
+ void sendSpans() {
+ if (hosts.isEmpty()) {
+ if (!sendQueue.isEmpty()) {
+ log.error("No hosts to send data to, dropping queued spans");
+ synchronized (sendQueue) {
+ sendQueue.clear();
+ sendQueue.notifyAll();
+ }
+ }
+ } else {
+ super.sendSpans();
+ }
+ }
+
synchronized private void updateHosts(String path, List<String> children) {
log.debug("Scanning trace hosts in zookeeper: " + path);
try {