You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/01/17 16:24:36 UTC
[1/3] git commit: ACCUMULO-2209 reset the batch writer on any
exception
Updated Branches:
refs/heads/1.6.0-SNAPSHOT 1b5c054d5 -> c35b70c62
ACCUMULO-2209 reset the batch writer on any exception
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/99c4361a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/99c4361a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/99c4361a
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 99c4361aea0878fbbce1b855492fa67f630f0c60
Parents: 0ffb01a
Author: Eric Newton <er...@gmail.com>
Authored: Fri Jan 17 10:20:14 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Fri Jan 17 10:20:24 2014 -0500
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/trace/TraceServer.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/99c4361a/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java b/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
index 8e61a89..4d89e9c 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
@@ -27,7 +27,6 @@ import org.apache.accumulo.cloudtrace.thrift.SpanReceiver;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
@@ -189,7 +188,7 @@ public class TraceServer implements Watcher {
private void flush() {
try {
writer.flush();
- } catch (MutationsRejectedException e) {
+ } catch (Exception e) {
log.error("Error flushing traces", e);
resetWriter();
}
[2/3] git commit: ACCUMULO-2209 reset the batch writer on any
exception
Posted by ec...@apache.org.
ACCUMULO-2209 reset the batch writer on any exception
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/75f27e5d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/75f27e5d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/75f27e5d
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 75f27e5d17b2b667438b3ca4ecd59e76d0ec206f
Parents: c42a511 99c4361
Author: Eric Newton <er...@gmail.com>
Authored: Fri Jan 17 10:23:37 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Fri Jan 17 10:23:37 2014 -0500
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/trace/TraceServer.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/75f27e5d/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
index 77cdb17,0000000..0777d03
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
@@@ -1,293 -1,0 +1,292 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.trace;
+
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
- import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.trace.TraceFormatter;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.classloader.AccumuloClassLoader;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.thrift.RemoteSpan;
+import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
+import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class TraceServer implements Watcher {
+
+ final private static Logger log = Logger.getLogger(TraceServer.class);
+ final private ServerConfiguration serverConfiguration;
+ final private TServer server;
+ private BatchWriter writer = null;
+ private Connector connector;
+ final String table;
+
+ private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
+ m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len));
+ }
+
+ static class ByteArrayTransport extends TTransport {
+ TByteArrayOutputStream out = new TByteArrayOutputStream();
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void open() throws TTransportException {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public int read(byte[] buf, int off, int len) {
+ return 0;
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ out.write(buf, off, len);
+ }
+
+ public byte[] get() {
+ return out.get();
+ }
+
+ public int len() {
+ return out.len();
+ }
+ }
+
+ class Receiver implements Iface {
+ @Override
+ public void span(RemoteSpan s) throws TException {
+ String idString = Long.toHexString(s.traceId);
+ String startString = Long.toHexString(s.start);
+ Mutation spanMutation = new Mutation(new Text(idString));
+ Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString));
+ long diff = s.stop - s.start;
+ indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes()));
+ ByteArrayTransport transport = new ByteArrayTransport();
+ TCompactProtocol protocol = new TCompactProtocol(transport);
+ s.write(protocol);
+ String parentString = Long.toHexString(s.parentId);
+ if (s.parentId == Span.ROOT_SPAN_ID)
+ parentString = "";
+ put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(), transport.len());
+ // Map the root span to time so we can look up traces by time
+ Mutation timeMutation = null;
+ if (s.parentId == Span.ROOT_SPAN_ID) {
+ timeMutation = new Mutation(new Text("start:" + startString));
+ put(timeMutation, "id", idString, transport.get(), transport.len());
+ }
+ try {
+ if (writer == null)
+ resetWriter();
+ if (writer == null)
+ return;
+ writer.addMutation(spanMutation);
+ writer.addMutation(indexMutation);
+ if (timeMutation != null)
+ writer.addMutation(timeMutation);
+ } catch (Exception ex) {
+ log.error("Unable to write mutation to table: " + spanMutation, ex);
+ }
+ }
+
+ }
+
+ public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception {
+ this.serverConfiguration = serverConfiguration;
+ AccumuloConfiguration conf = serverConfiguration.getConfiguration();
+ table = conf.get(Property.TRACE_TABLE);
+ while (true) {
+ try {
+ String principal = conf.get(Property.TRACE_USER);
+ AuthenticationToken at;
+ Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
+ if (loginMap.isEmpty()) {
+ Property p = Property.TRACE_PASSWORD;
+ at = new PasswordToken(conf.get(p).getBytes());
+ } else {
+ Properties props = new Properties();
+ AuthenticationToken token = AccumuloClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class)
+ .newInstance();
+
+ int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length() + 1;
+ for (Entry<String,String> entry : loginMap.entrySet()) {
+ props.put(entry.getKey().substring(prefixLength), entry.getValue());
+ }
+
+ token.init(props);
+
+ at = token;
+ }
+
+ connector = serverConfiguration.getInstance().getConnector(principal, at);
+ if (!connector.tableOperations().exists(table)) {
+ connector.tableOperations().create(table);
+ IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
+ AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
+ connector.tableOperations().attachIterator(table, setting);
+ }
+ connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
+ break;
+ } catch (Exception ex) {
+ log.info("Waiting to checking/create the trace table.", ex);
+ UtilWaitThread.sleep(1000);
+ }
+ }
+
+ int port = conf.getPort(Property.TRACE_PORT);
+ final ServerSocket sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(port));
+ final TServerTransport transport = new TServerSocket(sock);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.processor(new Processor<Iface>(new Receiver()));
+ server = new TThreadPoolServer(options);
+ final InetSocketAddress address = new InetSocketAddress(hostname, sock.getLocalPort());
+ registerInZooKeeper(AddressUtil.toString(address));
+
+ writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS));
+ }
+
+ public void run() throws Exception {
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ flush();
+ }
+ }, 1000, 1000);
+ server.serve();
+ }
+
+ private void flush() {
+ try {
+ writer.flush();
- } catch (MutationsRejectedException e) {
++ } catch (Exception e) {
+ log.error("Error flushing traces", e);
+ resetWriter();
+ }
+ }
+
+ synchronized private void resetWriter() {
+ try {
+ if (writer != null)
+ writer.close();
+ } catch (Exception ex) {
+ log.error("Error closing batch writer", ex);
+ } finally {
+ writer = null;
+ try {
+ writer = connector.createBatchWriter(table, new BatchWriterConfig());
+ } catch (Exception ex) {
+ log.error("Unable to create a batch writer: " + ex);
+ }
+ }
+ }
+
+ private void registerInZooKeeper(String name) throws Exception {
+ String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS;
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes());
+ zoo.exists(path, this);
+ }
+
+ public static void main(String[] args) throws Exception {
+ SecurityUtil.serverLogin();
+ Instance instance = HdfsZooInstance.getInstance();
+ ServerConfiguration conf = new ServerConfiguration(instance);
+ FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), conf.getConfiguration());
+ Accumulo.init(fs, conf, "tracer");
+ String hostname = Accumulo.getLocalAddress(args);
+ TraceServer server = new TraceServer(conf, hostname);
+ Accumulo.enableTracing(hostname, "tserver");
+ server.run();
+ log.info("tracer stopping");
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
+ if (event.getState() == KeeperState.Expired) {
+ log.warn("Trace server lost zookeeper registration at " + event.getPath());
+ server.stop();
+ } else if (event.getType() == EventType.NodeDeleted) {
+ log.warn("Trace server zookeeper entry lost " + event.getPath());
+ server.stop();
+ }
+ if (event.getPath() != null) {
+ try {
+ if (ZooReaderWriter.getInstance().exists(event.getPath(), this))
+ return;
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ log.warn("Trace server unable to reset watch on zookeeper registration");
+ server.stop();
+ }
+ }
+
+}
[3/3] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Posted by ec...@apache.org.
Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c35b70c6
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c35b70c6
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c35b70c6
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: c35b70c6243a1e311b4184d83b4cb14f257a8cc6
Parents: 1b5c054 75f27e5
Author: Eric Newton <er...@gmail.com>
Authored: Fri Jan 17 10:23:47 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Fri Jan 17 10:23:47 2014 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/accumulo/tracer/TraceServer.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c35b70c6/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 8bd985f,0000000..d513ebc
mode 100644,000000..100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@@ -1,292 -1,0 +1,291 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tracer;
+
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
- import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.trace.TraceFormatter;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.classloader.AccumuloClassLoader;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.thrift.RemoteSpan;
+import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
+import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class TraceServer implements Watcher {
+
+ final private static Logger log = Logger.getLogger(TraceServer.class);
+ final private ServerConfiguration serverConfiguration;
+ final private TServer server;
+ private BatchWriter writer = null;
+ private Connector connector;
+ final String table;
+
+ private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
+ m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len));
+ }
+
+ static class ByteArrayTransport extends TTransport {
+ TByteArrayOutputStream out = new TByteArrayOutputStream();
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void open() throws TTransportException {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public int read(byte[] buf, int off, int len) {
+ return 0;
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ out.write(buf, off, len);
+ }
+
+ public byte[] get() {
+ return out.get();
+ }
+
+ public int len() {
+ return out.len();
+ }
+ }
+
+ class Receiver implements Iface {
+ @Override
+ public void span(RemoteSpan s) throws TException {
+ String idString = Long.toHexString(s.traceId);
+ String startString = Long.toHexString(s.start);
+ Mutation spanMutation = new Mutation(new Text(idString));
+ Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString));
+ long diff = s.stop - s.start;
+ indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes()));
+ ByteArrayTransport transport = new ByteArrayTransport();
+ TCompactProtocol protocol = new TCompactProtocol(transport);
+ s.write(protocol);
+ String parentString = Long.toHexString(s.parentId);
+ if (s.parentId == Span.ROOT_SPAN_ID)
+ parentString = "";
+ put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(), transport.len());
+ // Map the root span to time so we can look up traces by time
+ Mutation timeMutation = null;
+ if (s.parentId == Span.ROOT_SPAN_ID) {
+ timeMutation = new Mutation(new Text("start:" + startString));
+ put(timeMutation, "id", idString, transport.get(), transport.len());
+ }
+ try {
+ if (writer == null)
+ resetWriter();
+ if (writer == null)
+ return;
+ writer.addMutation(spanMutation);
+ writer.addMutation(indexMutation);
+ if (timeMutation != null)
+ writer.addMutation(timeMutation);
+ } catch (Exception ex) {
+ log.error("Unable to write mutation to table: " + spanMutation, ex);
+ }
+ }
+
+ }
+
+ public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception {
+ this.serverConfiguration = serverConfiguration;
+ AccumuloConfiguration conf = serverConfiguration.getConfiguration();
+ table = conf.get(Property.TRACE_TABLE);
+ while (true) {
+ try {
+ String principal = conf.get(Property.TRACE_USER);
+ AuthenticationToken at;
+ Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
+ if (loginMap.isEmpty()) {
+ Property p = Property.TRACE_PASSWORD;
+ at = new PasswordToken(conf.get(p).getBytes());
+ } else {
+ Properties props = new Properties();
+ AuthenticationToken token = AccumuloClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class)
+ .newInstance();
+
+ int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length() + 1;
+ for (Entry<String,String> entry : loginMap.entrySet()) {
+ props.put(entry.getKey().substring(prefixLength), entry.getValue());
+ }
+
+ token.init(props);
+
+ at = token;
+ }
+
+ connector = serverConfiguration.getInstance().getConnector(principal, at);
+ if (!connector.tableOperations().exists(table)) {
+ connector.tableOperations().create(table);
+ IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
+ AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
+ connector.tableOperations().attachIterator(table, setting);
+ }
+ connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
+ break;
+ } catch (Exception ex) {
+ log.info("Waiting to checking/create the trace table.", ex);
+ UtilWaitThread.sleep(1000);
+ }
+ }
+
+ int port = conf.getPort(Property.TRACE_PORT);
+ final ServerSocket sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(hostname, port));
+ final TServerTransport transport = new TServerSocket(sock);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.processor(new Processor<Iface>(new Receiver()));
+ server = new TThreadPoolServer(options);
+ registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort());
+ writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS));
+ }
+
+ public void run() throws Exception {
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ flush();
+ }
+ }, 1000, 1000);
+ server.serve();
+ }
+
+ private void flush() {
+ try {
+ writer.flush();
- } catch (MutationsRejectedException e) {
++ } catch (Exception e) {
+ log.error("Error flushing traces", e);
+ resetWriter();
+ }
+ }
+
+ synchronized private void resetWriter() {
+ try {
+ if (writer != null)
+ writer.close();
+ } catch (Exception ex) {
+ log.error("Error closing batch writer", ex);
+ } finally {
+ writer = null;
+ try {
+ writer = connector.createBatchWriter(table, new BatchWriterConfig());
+ } catch (Exception ex) {
+ log.error("Unable to create a batch writer: " + ex);
+ }
+ }
+ }
+
+ private void registerInZooKeeper(String name) throws Exception {
+ String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS;
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes());
+ zoo.exists(path, this);
+ }
+
+ public static void main(String[] args) throws Exception {
+ SecurityUtil.serverLogin();
+ ServerOpts opts = new ServerOpts();
+ opts.parseArgs("tracer", args);
+ Instance instance = HdfsZooInstance.getInstance();
+ ServerConfiguration conf = new ServerConfiguration(instance);
+ VolumeManager fs = VolumeManagerImpl.get();
+ Accumulo.init(fs, conf, "tracer");
+ String hostname = opts.getAddress();
+ TraceServer server = new TraceServer(conf, hostname);
+ Accumulo.enableTracing(hostname, "tserver");
+ server.run();
+ log.info("tracer stopping");
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
+ if (event.getState() == KeeperState.Expired) {
+ log.warn("Trace server lost zookeeper registration at " + event.getPath());
+ server.stop();
+ } else if (event.getType() == EventType.NodeDeleted) {
+ log.warn("Trace server zookeeper entry lost " + event.getPath());
+ server.stop();
+ }
+ if (event.getPath() != null) {
+ try {
+ if (ZooReaderWriter.getInstance().exists(event.getPath(), this))
+ return;
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ log.warn("Trace server unable to reset watch on zookeeper registration");
+ server.stop();
+ }
+ }
+
+}