You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/01/23 08:36:35 UTC
[17/23] Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4cd3b1b/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
index 0777d03,0000000..116f134
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
@@@ -1,292 -1,0 +1,323 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.trace;
+
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
++import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.trace.TraceFormatter;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.classloader.AccumuloClassLoader;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.thrift.RemoteSpan;
+import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
+import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class TraceServer implements Watcher {
+
+ final private static Logger log = Logger.getLogger(TraceServer.class);
+ final private ServerConfiguration serverConfiguration;
+ final private TServer server;
- private BatchWriter writer = null;
- private Connector connector;
++ final private AtomicReference<BatchWriter> writer;
++ final private Connector connector;
+ final String table;
+
+ private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
+ m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len));
+ }
+
+ static class ByteArrayTransport extends TTransport {
+ TByteArrayOutputStream out = new TByteArrayOutputStream();
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void open() throws TTransportException {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public int read(byte[] buf, int off, int len) {
+ return 0;
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ out.write(buf, off, len);
+ }
+
+ public byte[] get() {
+ return out.get();
+ }
+
+ public int len() {
+ return out.len();
+ }
+ }
+
+ class Receiver implements Iface {
+ @Override
+ public void span(RemoteSpan s) throws TException {
+ String idString = Long.toHexString(s.traceId);
+ String startString = Long.toHexString(s.start);
+ Mutation spanMutation = new Mutation(new Text(idString));
+ Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString));
+ long diff = s.stop - s.start;
+ indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes()));
+ ByteArrayTransport transport = new ByteArrayTransport();
+ TCompactProtocol protocol = new TCompactProtocol(transport);
+ s.write(protocol);
+ String parentString = Long.toHexString(s.parentId);
+ if (s.parentId == Span.ROOT_SPAN_ID)
+ parentString = "";
+ put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(), transport.len());
+ // Map the root span to time so we can look up traces by time
+ Mutation timeMutation = null;
+ if (s.parentId == Span.ROOT_SPAN_ID) {
+ timeMutation = new Mutation(new Text("start:" + startString));
+ put(timeMutation, "id", idString, transport.get(), transport.len());
+ }
+ try {
- if (writer == null)
- resetWriter();
- if (writer == null)
++ final BatchWriter writer = TraceServer.this.writer.get();
++ /* Check for null, because we expect spans to come in much faster than flush calls.
++ In the case of failure, we'd rather avoid logging tons of NPEs.
++ */
++ if (null == writer) {
++ log.warn("writer is not ready; discarding span.");
+ return;
++ }
+ writer.addMutation(spanMutation);
+ writer.addMutation(indexMutation);
+ if (timeMutation != null)
+ writer.addMutation(timeMutation);
- } catch (Exception ex) {
- log.error("Unable to write mutation to table: " + spanMutation, ex);
++ } catch (MutationsRejectedException exception) {
++ log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + exception);
++ if (log.isDebugEnabled()) {
++ log.debug("discarded span due to rejection of mutation: " + spanMutation, exception);
++ }
++ /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */
++ } catch (RuntimeException exception) {
++ log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception);
++ log.debug("unable to write mutation to table due to exception.", exception);
+ }
+ }
+
+ }
+
+ public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception {
+ this.serverConfiguration = serverConfiguration;
+ AccumuloConfiguration conf = serverConfiguration.getConfiguration();
+ table = conf.get(Property.TRACE_TABLE);
++ Connector connector = null;
+ while (true) {
+ try {
+ String principal = conf.get(Property.TRACE_USER);
+ AuthenticationToken at;
+ Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
+ if (loginMap.isEmpty()) {
+ Property p = Property.TRACE_PASSWORD;
+ at = new PasswordToken(conf.get(p).getBytes());
+ } else {
+ Properties props = new Properties();
+ AuthenticationToken token = AccumuloClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class)
+ .newInstance();
+
+ int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length() + 1;
+ for (Entry<String,String> entry : loginMap.entrySet()) {
+ props.put(entry.getKey().substring(prefixLength), entry.getValue());
+ }
+
+ token.init(props);
+
+ at = token;
+ }
+
+ connector = serverConfiguration.getInstance().getConnector(principal, at);
+ if (!connector.tableOperations().exists(table)) {
+ connector.tableOperations().create(table);
+ IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
+ AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
+ connector.tableOperations().attachIterator(table, setting);
+ }
+ connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
+ break;
+ } catch (Exception ex) {
+ log.info("Waiting to checking/create the trace table.", ex);
+ UtilWaitThread.sleep(1000);
+ }
+ }
++ this.connector = connector;
++ // make sure we refer to the final variable from now on.
++ connector = null;
+
+ int port = conf.getPort(Property.TRACE_PORT);
+ final ServerSocket sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(port));
+ final TServerTransport transport = new TServerSocket(sock);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.processor(new Processor<Iface>(new Receiver()));
+ server = new TThreadPoolServer(options);
+ final InetSocketAddress address = new InetSocketAddress(hostname, sock.getLocalPort());
+ registerInZooKeeper(AddressUtil.toString(address));
+
- writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS));
++ writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS)));
+ }
+
+ public void run() throws Exception {
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ flush();
+ }
+ }, 1000, 1000);
+ server.serve();
+ }
+
+ private void flush() {
+ try {
- writer.flush();
- } catch (Exception e) {
- log.error("Error flushing traces", e);
++ final BatchWriter writer = this.writer.get();
++ if (null != writer) {
++ writer.flush();
++ }
++ } catch (MutationsRejectedException exception) {
++ log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
++ log.debug("flushing traces failed due to exception", exception);
++ resetWriter();
++ /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */
++ } catch (RuntimeException exception) {
++ log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
++ log.debug("flushing traces failed due to exception", exception);
+ resetWriter();
+ }
+ }
+
- synchronized private void resetWriter() {
++ private void resetWriter() {
++ BatchWriter writer = null;
+ try {
- if (writer != null)
- writer.close();
++ writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS));
+ } catch (Exception ex) {
- log.error("Error closing batch writer", ex);
++ log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + ex);
++ log.debug("batch writer creation failed with exception.", ex);
+ } finally {
- writer = null;
++ /* Trade in the new writer (even if null) for the one we need to close. */
++ writer = this.writer.getAndSet(writer);
+ try {
- writer = connector.createBatchWriter(table, new BatchWriterConfig());
++ if (null != writer) {
++ writer.close();
++ }
+ } catch (Exception ex) {
- log.error("Unable to create a batch writer: " + ex);
++ log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex);
++ log.debug("batch writer close failed with exception", ex);
+ }
+ }
+ }
+
+ private void registerInZooKeeper(String name) throws Exception {
+ String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS;
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes());
+ zoo.exists(path, this);
+ }
+
+ public static void main(String[] args) throws Exception {
+ SecurityUtil.serverLogin();
+ Instance instance = HdfsZooInstance.getInstance();
+ ServerConfiguration conf = new ServerConfiguration(instance);
+ FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), conf.getConfiguration());
+ Accumulo.init(fs, conf, "tracer");
+ String hostname = Accumulo.getLocalAddress(args);
+ TraceServer server = new TraceServer(conf, hostname);
+ Accumulo.enableTracing(hostname, "tserver");
+ server.run();
+ log.info("tracer stopping");
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
+ if (event.getState() == KeeperState.Expired) {
+ log.warn("Trace server lost zookeeper registration at " + event.getPath());
+ server.stop();
+ } else if (event.getType() == EventType.NodeDeleted) {
+ log.warn("Trace server zookeeper entry lost " + event.getPath());
+ server.stop();
+ }
+ if (event.getPath() != null) {
+ try {
+ if (ZooReaderWriter.getInstance().exists(event.getPath(), this))
+ return;
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ log.warn("Trace server unable to reset watch on zookeeper registration");
+ server.stop();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4cd3b1b/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
index 4adb992,0000000..dc1b89c
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
@@@ -1,76 -1,0 +1,84 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
++import java.net.UnknownHostException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class TabletOperations {
+
+ private static final Logger log = Logger.getLogger(TabletOperations.class);
+
+ public static String createTabletDirectory(FileSystem fs, String tableDir, Text endRow) {
+ String lowDirectory;
+
+ UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+
+ while (true) {
+ try {
+ if (endRow == null) {
+ lowDirectory = Constants.DEFAULT_TABLET_LOCATION;
+ Path lowDirectoryPath = new Path(tableDir + lowDirectory);
+ if (fs.exists(lowDirectoryPath) || fs.mkdirs(lowDirectoryPath))
+ return lowDirectory;
+ log.warn("Failed to create " + lowDirectoryPath + " for unknown reason");
+ } else {
+ lowDirectory = "/" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
+ Path lowDirectoryPath = new Path(tableDir + lowDirectory);
+ if (fs.exists(lowDirectoryPath))
+ throw new IllegalStateException("Dir exist when it should not " + lowDirectoryPath);
+ if (fs.mkdirs(lowDirectoryPath))
+ return lowDirectory;
+ }
+ } catch (IOException e) {
+ log.warn(e);
+ }
+
+ log.warn("Failed to create dir for tablet in table " + tableDir + " will retry ...");
+ UtilWaitThread.sleep(3000);
+
+ }
+ }
+
+ public static String createTabletDirectory(String tableDir, Text endRow) {
+ while (true) {
+ try {
+ FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+ return createTabletDirectory(fs, tableDir, endRow);
+ } catch (IOException e) {
- log.warn(e);
++ log.warn("problem creating tablet directory", e);
++ } catch (IllegalArgumentException exception) {
++ /* thrown in some edge cases of DNS failure by Hadoop instead of UnknownHostException */
++ if (exception.getCause() instanceof UnknownHostException) {
++ log.warn("problem creating tablet directory", exception);
++ } else {
++ throw exception;
++ }
+ }
+ UtilWaitThread.sleep(3000);
+ }
+ }
+}