You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/01/10 22:37:49 UTC
[5/6] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7c309097
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7c309097
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7c309097
Branch: refs/heads/master
Commit: 7c309097c40779e96d51dd6c70333b5fb1cb8dd7
Parents: deae04f 3b41d37
Author: Eric Newton <er...@gmail.com>
Authored: Fri Jan 10 16:37:19 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Fri Jan 10 16:37:19 2014 -0500
----------------------------------------------------------------------
.../apache/accumulo/tserver/log/DfsLogger.java | 56 +++++++++-----------
.../tserver/log/TabletServerLogger.java | 4 +-
2 files changed, 28 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7c309097/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 571d1bc,0000000..cc28ac2
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@@ -1,548 -1,0 +1,544 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
+import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
+import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.crypto.CryptoModule;
+import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
+import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
+import org.apache.accumulo.core.security.crypto.DefaultCryptoModule;
+import org.apache.accumulo.core.security.crypto.NoFlushOutputStream;
+import org.apache.accumulo.core.util.Daemon;
++import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.tserver.TabletMutations;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Wrap a connection to a logger.
+ *
+ */
+public class DfsLogger {
+ // Package private so that LogSorter can find this
+ static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
+ static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
+
+ private static Logger log = Logger.getLogger(DfsLogger.class);
+
+ public static class LogClosedException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public LogClosedException() {
+ super("LogClosed");
+ }
+ }
+
+ public static class DFSLoggerInputStreams {
+
+ private FSDataInputStream originalInput;
+ private DataInputStream decryptingInputStream;
+
+ public DFSLoggerInputStreams(FSDataInputStream originalInput, DataInputStream decryptingInputStream) {
+ this.originalInput = originalInput;
+ this.decryptingInputStream = decryptingInputStream;
+ }
+
+ public FSDataInputStream getOriginalInput() {
+ return originalInput;
+ }
+
+ public void setOriginalInput(FSDataInputStream originalInput) {
+ this.originalInput = originalInput;
+ }
+
+ public DataInputStream getDecryptingInputStream() {
+ return decryptingInputStream;
+ }
+
+ public void setDecryptingInputStream(DataInputStream decryptingInputStream) {
+ this.decryptingInputStream = decryptingInputStream;
+ }
+ }
+
+
+ public interface ServerResources {
+ AccumuloConfiguration getConfiguration();
+
+ VolumeManager getFileSystem();
+
+ Set<TServerInstance> getCurrentTServers();
+ }
+
+ private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<DfsLogger.LogWork>();
+
+ private final Object closeLock = new Object();
+
- private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, null);
++ private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null);
+
+ private static final LogFileValue EMPTY = new LogFileValue();
+
+ private boolean closed = false;
+
+ private class LogSyncingTask implements Runnable {
+
+ @Override
+ public void run() {
+ ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
+ while (true) {
+ work.clear();
+
+ try {
+ work.add(workQueue.take());
+ } catch (InterruptedException ex) {
+ continue;
+ }
+ workQueue.drainTo(work);
+
+ synchronized (closeLock) {
+ if (!closed) {
+ try {
+ sync.invoke(logFile);
+ } catch (Exception ex) {
+ log.warn("Exception syncing " + ex);
+ for (DfsLogger.LogWork logWork : work) {
+ logWork.exception = ex;
+ }
+ }
+ } else {
+ for (DfsLogger.LogWork logWork : work) {
+ logWork.exception = new LogClosedException();
+ }
+ }
+ }
+
+ boolean sawClosedMarker = false;
+ for (DfsLogger.LogWork logWork : work)
+ if (logWork == CLOSED_MARKER)
+ sawClosedMarker = true;
+ else
+ logWork.latch.countDown();
+
+ if (sawClosedMarker) {
+ synchronized (closeLock) {
+ closeLock.notifyAll();
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ static class LogWork {
- List<TabletMutations> mutations;
+ CountDownLatch latch;
+ volatile Exception exception;
-
- public LogWork(List<TabletMutations> mutations, CountDownLatch latch) {
- this.mutations = mutations;
++
++ public LogWork(CountDownLatch latch) {
+ this.latch = latch;
+ }
+ }
+
+ public static class LoggerOperation {
+ private final LogWork work;
+
+ public LoggerOperation(LogWork work) {
+ this.work = work;
+ }
+
+ public void await() throws IOException {
+ try {
+ work.latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (work.exception != null) {
+ if (work.exception instanceof IOException)
+ throw (IOException) work.exception;
+ else if (work.exception instanceof RuntimeException)
+ throw (RuntimeException) work.exception;
+ else
+ throw new RuntimeException(work.exception);
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ // filename is unique
+ if (obj == null)
+ return false;
+ if (obj instanceof DfsLogger)
+ return getFileName().equals(((DfsLogger) obj).getFileName());
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ // filename is unique
+ return getFileName().hashCode();
+ }
+
+ private final ServerResources conf;
+ private FSDataOutputStream logFile;
+ private DataOutputStream encryptingLogFile = null;
+ private Method sync;
+ private String logPath;
+
+ public DfsLogger(ServerResources conf) throws IOException {
+ this.conf = conf;
+ }
+
+ public DfsLogger(ServerResources conf, String filename) throws IOException {
+ this.conf = conf;
+ this.logPath = filename;
+ }
+
+ public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException {
+ FSDataInputStream input = fs.open(path);
+ DataInputStream decryptingInput = null;
+
+ byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes();
+ byte[] magicBuffer = new byte[magic.length];
+ input.readFully(magicBuffer);
+ if (Arrays.equals(magicBuffer, magic)) {
+ // additional parameters it needs from the underlying stream.
+ String cryptoModuleClassname = input.readUTF();
+ CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
+
+ // Create the parameters and set the input stream into those parameters
+ CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+ params.setEncryptedInputStream(input);
+
+ // Create the plaintext input stream from the encrypted one
+ params = cryptoModule.getDecryptingInputStream(params);
+
+ if (params.getPlaintextInputStream() instanceof DataInputStream) {
+ decryptingInput = (DataInputStream) params.getPlaintextInputStream();
+ } else {
+ decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+ }
+ } else {
+ input.seek(0);
+ byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
+ byte[] magicBufferV2 = new byte[magic.length];
+ input.readFully(magicBufferV2);
+
+ if (Arrays.equals(magicBufferV2, magicV2)) {
+ // Log files from 1.5 dump their options in raw to the logger files. Since we don't know the class
+ // that needs to read those files, we can make a couple of basic assumptions. Either it's going to be
+ // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
+
+ // If it's null, we won't have any parameters whatsoever. First, let's attempt to read
+ // parameters
+ Map<String,String> opts = new HashMap<String,String>();
+ int count = input.readInt();
+ for (int i = 0; i < count; i++) {
+ String key = input.readUTF();
+ String value = input.readUTF();
+ opts.put(key, value);
+ }
+
+ if (opts.size() == 0) {
+ // NullCryptoModule, we're done
+ decryptingInput = input;
+ } else {
+
+ // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot.
+ org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
+ .getCryptoModule(DefaultCryptoModule.class.getName());
+
+ CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+
+ input.seek(0);
+ input.readFully(magicBuffer);
+ params.setEncryptedInputStream(input);
+
+ params = cryptoModule.getDecryptingInputStream(params);
+ if (params.getPlaintextInputStream() instanceof DataInputStream) {
+ decryptingInput = (DataInputStream) params.getPlaintextInputStream();
+ } else {
+ decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+ }
+ }
+
+ } else {
+
+ input.seek(0);
+ decryptingInput = input;
+ }
+
+ }
+ return new DFSLoggerInputStreams(input, decryptingInput);
+ }
+
+ public synchronized void open(String address) throws IOException {
+ String filename = UUID.randomUUID().toString();
+ String logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
+
+ log.debug("DfsLogger.open() begin");
+ VolumeManager fs = conf.getFileSystem();
+
+ logPath = fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename;
+ try {
+ short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
+ if (replication == 0)
+ replication = fs.getDefaultReplication(new Path(logPath));
+ long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
+ if (blockSize == 0)
+ blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
+ if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
+ logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize);
+ else
+ logFile = fs.create(new Path(logPath), true, 0, replication, blockSize);
+
+ try {
+ NoSuchMethodException e = null;
+ try {
+ // sync: send data to datanodes
+ sync = logFile.getClass().getMethod("sync");
+ } catch (NoSuchMethodException ex) {
+ e = ex;
+ }
+ try {
+ // hsync: send data to datanodes and sync the data to disk
+ sync = logFile.getClass().getMethod("hsync");
+ e = null;
+ } catch (NoSuchMethodException ex) {}
+ if (e != null)
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ // Initialize the crypto operations.
+ org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf
+ .getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
+
+ // Initialize the log file with a header and the crypto params used to set up this log file.
+ logFile.write(LOG_FILE_HEADER_V3.getBytes());
+
+ CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
+
+ params.setPlaintextOutputStream(new NoFlushOutputStream(logFile));
+
+ // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here,
+ // so that that crypto module can re-read its own parameters.
+
+ logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
+
+
+ params = cryptoModule.getEncryptingOutputStream(params);
+ OutputStream encipheringOutputStream = params.getEncryptedOutputStream();
+
+ // If the module just kicks back our original stream, then just use it, don't wrap it in
+ // another data OutputStream.
+ if (encipheringOutputStream == logFile) {
+ encryptingLogFile = logFile;
+ } else {
+ encryptingLogFile = new DataOutputStream(encipheringOutputStream);
+ }
+
+ LogFileKey key = new LogFileKey();
+ key.event = OPEN;
+ key.tserverSession = filename;
+ key.filename = filename;
+ write(key, EMPTY);
+ sync.invoke(logFile);
+ log.debug("Got new write-ahead log: " + this);
+ } catch (Exception ex) {
+ if (logFile != null)
+ logFile.close();
+ logFile = null;
+ encryptingLogFile = null;
+ throw new IOException(ex);
+ }
+
+ Thread t = new Daemon(new LogSyncingTask());
+ t.setName("Accumulo WALog thread " + toString());
+ t.start();
+ }
+
+ @Override
+ public String toString() {
+ String fileName = getFileName();
+ if (fileName.contains(":"))
+ return getLogger() + "/" + getFileName();
+ return fileName;
+ }
+
+ public String getFileName() {
+ return logPath.toString();
+ }
+
+ public void close() throws IOException {
+
+ synchronized (closeLock) {
+ if (closed)
+ return;
+ // after closed is set to true, nothing else should be added to the queue
+ // CLOSED_MARKER should be the last thing on the queue, therefore when the
+ // background thread sees the marker and exits there should be nothing else
+ // to process... so nothing should be left waiting for the background
+ // thread to do work
+ closed = true;
+ workQueue.add(CLOSED_MARKER);
+ while (!workQueue.isEmpty())
+ try {
+ closeLock.wait();
+ } catch (InterruptedException e) {
+ log.info("Interrupted");
+ }
+ }
+
+ if (encryptingLogFile != null)
+ try {
+ encryptingLogFile.close();
+ } catch (IOException ex) {
+ log.error(ex);
+ throw new LogClosedException();
+ }
+ }
+
+ public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException {
+ // write this log to the METADATA table
+ final LogFileKey key = new LogFileKey();
+ key.event = DEFINE_TABLET;
+ key.seq = seq;
+ key.tid = tid;
+ key.tablet = tablet;
+ try {
+ write(key, EMPTY);
+ sync.invoke(logFile);
+ } catch (Exception ex) {
+ log.error(ex);
+ throw new IOException(ex);
+ }
+ }
+
+ /**
+ * @param key
+ * @param empty2
+ * @throws IOException
+ */
+ private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
+ key.write(encryptingLogFile);
+ value.write(encryptingLogFile);
+ encryptingLogFile.flush();
+ }
+
+ public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
+ return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation))));
+ }
+
- public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
- DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new CountDownLatch(1));
-
++ private LoggerOperation logFileData(List<Pair<LogFileKey, LogFileValue>> keys) throws IOException {
++ DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1));
+ synchronized (DfsLogger.this) {
+ try {
- for (TabletMutations tabletMutations : mutations) {
- LogFileKey key = new LogFileKey();
- key.event = MANY_MUTATIONS;
- key.seq = tabletMutations.getSeq();
- key.tid = tabletMutations.getTid();
- LogFileValue value = new LogFileValue();
- value.mutations = tabletMutations.getMutations();
- write(key, value);
++ for (Pair<LogFileKey,LogFileValue> pair : keys) {
++ write(pair.getFirst(), pair.getSecond());
+ }
+ } catch (ClosedChannelException ex) {
+ throw new LogClosedException();
+ } catch (Exception e) {
+ log.error(e, e);
+ work.exception = e;
+ }
+ }
+
+ synchronized (closeLock) {
+ // use a different lock for close check so that adding to work queue does not need
+ // to wait on walog I/O operations
+
+ if (closed)
+ throw new LogClosedException();
+ workQueue.add(work);
+ }
+
+ return new LoggerOperation(work);
+ }
+
- public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
++ public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
++ List<Pair<LogFileKey, LogFileValue>> data = new ArrayList<Pair<LogFileKey, LogFileValue>>();
++ for (TabletMutations tabletMutations : mutations) {
++ LogFileKey key = new LogFileKey();
++ key.event = MANY_MUTATIONS;
++ key.seq = tabletMutations.getSeq();
++ key.tid = tabletMutations.getTid();
++ LogFileValue value = new LogFileValue();
++ value.mutations = tabletMutations.getMutations();
++ data.add(new Pair<LogFileKey, LogFileValue>(key, value));
++ }
++ return logFileData(data);
++ }
++
++ public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
+ LogFileKey key = new LogFileKey();
+ key.event = COMPACTION_FINISH;
+ key.seq = seq;
+ key.tid = tid;
- try {
- write(key, EMPTY);
- } catch (IOException ex) {
- log.error(ex);
- throw ex;
- }
++ return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY)));
+ }
+
- public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
++ public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
+ LogFileKey key = new LogFileKey();
+ key.event = COMPACTION_START;
+ key.seq = seq;
+ key.tid = tid;
+ key.filename = fqfn;
- try {
- write(key, EMPTY);
- } catch (IOException ex) {
- log.error(ex);
- throw ex;
- }
++ return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY)));
+ }
+
+ public String getLogger() {
+ String parts[] = logPath.split("/");
+ return StringUtil.join(Arrays.asList(parts[parts.length - 2].split("[+]")), ":");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7c309097/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index a276a97,0000000..fb90757
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -1,430 -1,0 +1,430 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.tserver.Tablet;
+import org.apache.accumulo.tserver.Tablet.CommitSession;
+import org.apache.accumulo.tserver.TabletMutations;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Central logging facility for the TServerInfo.
+ *
+ * Forwards in-memory updates to remote logs, carefully writing the same data to every log, while maintaining the maximum thread parallelism for greater
+ * performance. As new logs are used and minor compactions are performed, the metadata table is kept up-to-date.
+ *
+ */
+public class TabletServerLogger {
+
+ private static final Logger log = Logger.getLogger(TabletServerLogger.class);
+
+ private final AtomicLong logSizeEstimate = new AtomicLong();
+ private final long maxSize;
+
+ private final TabletServer tserver;
+
+ // The current log set: always updated to a new set with every change of loggers
+ private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
+
+ // The current generation of logSet.
+ // Because multiple threads can be using a log set at one time, a log
+ // failure is likely to affect multiple threads, who will all attempt to
+ // create a new logSet. This will cause many unnecessary updates to the
+ // metadata table.
+ // We'll use this generational counter to determine if another thread has
+ // already fetched a new logSet.
+ private AtomicInteger logSetId = new AtomicInteger();
+
+ // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them
+ private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock();
+
+ private final AtomicInteger seqGen = new AtomicInteger();
+
+ private static boolean enabled(Tablet tablet) {
+ return tablet.getTableConfiguration().getBoolean(Property.TABLE_WALOG_ENABLED);
+ }
+
+ private static boolean enabled(CommitSession commitSession) {
+ return enabled(commitSession.getTablet());
+ }
+
+ static private abstract class TestCallWithWriteLock {
+ abstract boolean test();
+
+ abstract void withWriteLock() throws IOException;
+ }
+
+ /**
+ * Pattern taken from the documentation for ReentrantReadWriteLock
+ *
+ * @param rwlock
+ * lock to use
+ * @param code
+ * a test/work pair
+ * @throws IOException
+ */
+ private static void testLockAndRun(final ReadWriteLock rwlock, TestCallWithWriteLock code) throws IOException {
+ // Get a read lock
+ rwlock.readLock().lock();
+ try {
+ // does some condition exist that needs the write lock?
+ if (code.test()) {
+ // Yes, let go of the readlock
+ rwlock.readLock().unlock();
+ // Grab the write lock
+ rwlock.writeLock().lock();
+ try {
+ // double-check the condition, since we let go of the lock
+ if (code.test()) {
+ // perform the work with with write lock held
+ code.withWriteLock();
+ }
+ } finally {
+ // regain the readlock
+ rwlock.readLock().lock();
+ // unlock the write lock
+ rwlock.writeLock().unlock();
+ }
+ }
+ } finally {
+ // always let go of the lock
+ rwlock.readLock().unlock();
+ }
+ }
+
+ public TabletServerLogger(TabletServer tserver, long maxSize) {
+ this.tserver = tserver;
+ this.maxSize = maxSize;
+ }
+
+ private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
+ final int[] result = {-1};
+ testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+ boolean test() {
+ copy.clear();
+ copy.addAll(loggers);
+ if (!loggers.isEmpty())
+ result[0] = logSetId.get();
+ return loggers.isEmpty();
+ }
+
+ void withWriteLock() throws IOException {
+ try {
+ createLoggers();
+ copy.clear();
+ copy.addAll(loggers);
+ if (copy.size() > 0)
+ result[0] = logSetId.get();
+ else
+ result[0] = -1;
+ } catch (IOException e) {
+ log.error("Unable to create loggers", e);
+ }
+ }
+ });
+ return result[0];
+ }
+
+ public void getLogFiles(Set<String> loggersOut) {
+ logSetLock.readLock().lock();
+ try {
+ for (DfsLogger logger : loggers) {
+ loggersOut.add(logger.getFileName());
+ }
+ } finally {
+ logSetLock.readLock().unlock();
+ }
+ }
+
+ synchronized private void createLoggers() throws IOException {
+ if (!logSetLock.isWriteLockedByCurrentThread()) {
+ throw new IllegalStateException("createLoggers should be called with write lock held!");
+ }
+
+ if (loggers.size() != 0) {
+ throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size());
+ }
+
+ try {
+ DfsLogger alog = new DfsLogger(tserver.getServerConfig());
+ alog.open(tserver.getClientAddressString());
+ loggers.add(alog);
+ logSetId.incrementAndGet();
+ return;
+ } catch (Exception t) {
+ throw new RuntimeException(t);
+ }
+ }
+
+ public void resetLoggers() throws IOException {
+ logSetLock.writeLock().lock();
+ try {
+ close();
+ } finally {
+ logSetLock.writeLock().unlock();
+ }
+ }
+
+ synchronized private void close() throws IOException {
+ if (!logSetLock.isWriteLockedByCurrentThread()) {
+ throw new IllegalStateException("close should be called with write lock held!");
+ }
+ try {
+ for (DfsLogger logger : loggers) {
+ try {
+ logger.close();
+ } catch (DfsLogger.LogClosedException ex) {
+ // ignore
+ } catch (Throwable ex) {
+ log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex);
+ }
+ }
+ loggers.clear();
+ logSizeEstimate.set(0);
+ } catch (Throwable t) {
+ throw new IOException(t);
+ }
+ }
+
+ interface Writer {
+ LoggerOperation write(DfsLogger logger, int seq) throws Exception;
+ }
+
+ private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
+ List<CommitSession> sessions = Collections.singletonList(commitSession);
+ return write(sessions, mincFinish, writer);
+ }
+
+ private int write(Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
+ // Work very hard not to lock this during calls to the outside world
+ int currentLogSet = logSetId.get();
+
+ int seq = -1;
+
+ int attempt = 0;
+ boolean success = false;
+ while (!success) {
+ try {
+ // get a reference to the loggers that no other thread can touch
+ ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
+ currentLogSet = initializeLoggers(copy);
+
+ // add the logger to the log set for the memory in the tablet,
+ // update the metadata table if we've never used this tablet
+
+ if (currentLogSet == logSetId.get()) {
+ for (CommitSession commitSession : sessions) {
+ if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
+ try {
+ // Scribble out a tablet definition and then write to the metadata table
+ defineTablet(commitSession);
+ if (currentLogSet == logSetId.get())
+ tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId());
+ } finally {
+ commitSession.finishUpdatingLogsUsed();
+ }
+ }
+ }
+ }
+
+ // Make sure that the logs haven't changed out from underneath our copy
+ if (currentLogSet == logSetId.get()) {
+
+ // write the mutation to the logs
+ seq = seqGen.incrementAndGet();
+ if (seq < 0)
+ throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven");
+ ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
+ for (DfsLogger wal : copy) {
+ LoggerOperation lop = writer.write(wal, seq);
+ if (lop != null)
+ queuedOperations.add(lop);
+ }
+
+ for (LoggerOperation lop : queuedOperations) {
+ lop.await();
+ }
+
+ // double-check: did the log set change?
+ success = (currentLogSet == logSetId.get());
+ }
+ } catch (DfsLogger.LogClosedException ex) {
+ log.debug("Logs closed while writing, retrying " + (attempt + 1));
+ } catch (Exception t) {
+ log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
+ UtilWaitThread.sleep(100);
+ } finally {
+ attempt++;
+ }
+ // Some sort of write failure occurred. Grab the write lock and reset the logs.
+ // But since multiple threads will attempt it, only attempt the reset when
+ // the logs haven't changed.
+ final int finalCurrent = currentLogSet;
+ if (!success) {
+ testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+
+ @Override
+ boolean test() {
+ return finalCurrent == logSetId.get();
+ }
+
+ @Override
+ void withWriteLock() throws IOException {
+ close();
+ }
+ });
+ }
+ }
+ // if the log gets too big, reset it .. grab the write lock first
+ logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead
+ testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+ boolean test() {
+ return logSizeEstimate.get() > maxSize;
+ }
+
+ void withWriteLock() throws IOException {
+ close();
+ }
+ });
+ return seq;
+ }
+
+ public int defineTablet(final CommitSession commitSession) throws IOException {
+ // scribble this into the metadata tablet, too.
+ if (!enabled(commitSession))
+ return -1;
+ return write(commitSession, false, new Writer() {
+ @Override
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
+ logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
+ return null;
+ }
+ });
+ }
+
+ public int log(final CommitSession commitSession, final int tabletSeq, final Mutation m) throws IOException {
+ if (!enabled(commitSession))
+ return -1;
+ int seq = write(commitSession, false, new Writer() {
+ @Override
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
+ return logger.log(tabletSeq, commitSession.getLogId(), m);
+ }
+ });
+ logSizeEstimate.addAndGet(m.numBytes());
+ return seq;
+ }
+
+ public int logManyTablets(Map<CommitSession,List<Mutation>> mutations) throws IOException {
+
+ final Map<CommitSession,List<Mutation>> loggables = new HashMap<CommitSession,List<Mutation>>(mutations);
+ for (CommitSession t : mutations.keySet()) {
+ if (!enabled(t))
+ loggables.remove(t);
+ }
+ if (loggables.size() == 0)
+ return -1;
+
+ int seq = write(loggables.keySet(), false, new Writer() {
+ @Override
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
+ List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
+ for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
+ CommitSession cs = entry.getKey();
+ copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue()));
+ }
+ return logger.logManyTablets(copy);
+ }
+ });
+ for (List<Mutation> entry : loggables.values()) {
+ if (entry.size() < 1)
+ throw new IllegalArgumentException("logManyTablets: logging empty mutation list");
+ for (Mutation m : entry) {
+ logSizeEstimate.addAndGet(m.numBytes());
+ }
+ }
+ return seq;
+ }
+
+ public void minorCompactionFinished(final CommitSession commitSession, final String fullyQualifiedFileName, final int walogSeq) throws IOException {
+
+ if (!enabled(commitSession))
+ return;
+
+ long t1 = System.currentTimeMillis();
+
+ int seq = write(commitSession, true, new Writer() {
+ @Override
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
- logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
++ logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName).await();
+ return null;
+ }
+ });
+
+ long t2 = System.currentTimeMillis();
+
+ log.debug(" wrote MinC finish " + seq + ": writeTime:" + (t2 - t1) + "ms ");
+ }
+
+ public int minorCompactionStarted(final CommitSession commitSession, final int seq, final String fullyQualifiedFileName) throws IOException {
+ if (!enabled(commitSession))
+ return -1;
+ write(commitSession, false, new Writer() {
+ @Override
+ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
- logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
++ logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName).await();
+ return null;
+ }
+ });
+ return seq;
+ }
+
+ public void recover(VolumeManager fs, Tablet tablet, List<Path> logs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
+ if (!enabled(tablet))
+ return;
+ try {
+ SortedLogRecovery recovery = new SortedLogRecovery(fs);
+ KeyExtent extent = tablet.getExtent();
+ recovery.recover(extent, logs, tabletFiles, mr);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+}