You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/10/03 01:19:29 UTC

[1/4] git commit: ACCUMULO-3182 Ensures that a WAL with a partial header (or completely) empty can be successfully recovered.

Repository: accumulo
Updated Branches:
  refs/heads/1.6 65acebbf9 -> a65f1234e
  refs/heads/master 9a8554f9c -> 089408d59


ACCUMULO-3182 Ensures that a WAL with a partial header (or completely) empty can be successfully recovered.

In the case where a WAL is observed which has a partial or missing header,
the file is treated as "empty": no log entries will be attempted to be
read and replayed from it. This should restore the normaly functionality
that was observed with 1.5 WALs.

Integration test was introduced which tests that the introduction of
either an empty WAL or a WAL with a partial header does not block
recovery of the tablet which references the "bad" WAL.

Additionally, an enum was introduced to remove the use of hard-coded
WAL recovery status marker files: "finished" and "failed".


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d65e0e32
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d65e0e32
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d65e0e32

Branch: refs/heads/1.6
Commit: d65e0e32de29206b627e578b957ce66c0d644cea
Parents: 3c90ee9
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 2 19:04:44 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 2 19:04:44 2014 -0400

----------------------------------------------------------------------
 .../accumulo/server/log/SortedLogState.java     |  66 ++++++
 .../master/recovery/RecoveryManager.java        |   3 +-
 .../apache/accumulo/tserver/TabletServer.java   |   9 +-
 .../apache/accumulo/tserver/log/DfsLogger.java  | 157 ++++++++------
 .../apache/accumulo/tserver/log/LogSorter.java  |  36 ++-
 .../accumulo/tserver/log/MultiReader.java       |  34 +--
 .../accumulo/tserver/logger/LogReader.java      |  13 +-
 .../tserver/log/SortedLogRecoveryTest.java      |   3 +-
 .../tserver/log/TestUpgradePathForWALogs.java   |   3 +-
 .../MissingWalHeaderCompletesRecoveryIT.java    | 217 +++++++++++++++++++
 10 files changed, 443 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java b/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
new file mode 100644
index 0000000..f337cd8
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.log;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A file is written in the destination directory for the sorting of write-ahead logs that need recovering. The value of {@link #marker} is the name of the file
+ * that will exist in the sorted output directory.
+ */
+public enum SortedLogState {
+  FINISHED("finished"), FAILED("failed");
+
+  private String marker;
+
+  private SortedLogState(String marker) {
+    this.marker = marker;
+  }
+
+  public String getMarker() {
+    return marker;
+  }
+
+  public static boolean isFinished(String fileName) {
+    return FINISHED.getMarker().equals(fileName);
+  }
+
+  public static boolean isFailed(String fileName) {
+    return FAILED.getMarker().equals(fileName);
+  }
+
+  public static Path getFinishedMarkerPath(String rootPath) {
+    return new Path(rootPath, FINISHED.getMarker());
+  }
+
+  public static Path getFinishedMarkerPath(Path rootPath) {
+    return new Path(rootPath, FINISHED.getMarker());
+  }
+
+  public static Path getFailedMarkerPath(String rootPath) {
+    return new Path(rootPath, FAILED.getMarker());
+  }
+  
+  public static Path getFailedMarkerPath(Path rootPath) {
+    return new Path(rootPath, FAILED.getMarker());
+  }
+
+  @Override
+  public String toString() {
+    return marker;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
index 76d3520..0c2e1f0 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.master.recovery.HadoopLogCloser;
 import org.apache.accumulo.server.master.recovery.LogCloser;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
@@ -154,7 +155,7 @@ public class RecoveryManager {
           }
         }
 
-        if (master.getFileSystem().exists(new Path(dest, "finished"))) {
+        if (master.getFileSystem().exists(SortedLogState.getFinishedMarkerPath(dest))) {
           synchronized (this) {
             closeTasksQueued.remove(sortId);
             recoveryDelay.remove(sortId);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 03dc86d..2551bd0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -161,6 +161,7 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
@@ -250,7 +251,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
   private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
-  
+
   private TabletServerLogger logger;
 
   protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
@@ -336,7 +337,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     if (sawChange) {
       log.debug(sb.toString());
     }
-    
+
     final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
     if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
       long diff = now - lastMemoryCheckTime;
@@ -347,7 +348,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       lastMemoryCheckTime = now;
       return;
     }
-    
+
     if (maxIncreaseInCollectionTime > keepAliveTimeout) {
       Halt.halt("Garbage collection may be interfering with lock keep-alive.  Halting.", -1);
     }
@@ -3700,7 +3701,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       Path recovery = null;
       for (String log : entry.logSet) {
         Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log));
-        finished = new Path(finished, "finished");
+        finished = SortedLogState.getFinishedMarkerPath(finished);
         TabletServer.log.info("Looking for " + finished);
         if (fs.exists(finished)) {
           recovery = finished.getParent();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index b152380..2bd0b47 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -24,6 +24,7 @@ import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
@@ -67,12 +68,11 @@ import org.apache.log4j.Logger;
 
 /**
  * Wrap a connection to a logger.
- * 
+ *
  */
 public class DfsLogger {
-  // Package private so that LogSorter can find this
-  static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
-  static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
+  public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
+  public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
 
   private static Logger log = Logger.getLogger(DfsLogger.class);
 
@@ -84,6 +84,26 @@ public class DfsLogger {
     }
   }
 
+  /**
+   * A well-timed tabletserver failure could result in an incomplete header written to a write-ahead log. This exception is thrown when the header cannot be
+   * read from a WAL which should only happen when the tserver dies as described.
+   */
+  public static class LogHeaderIncompleteException extends IOException {
+    private static final long serialVersionUID = 1l;
+
+    public LogHeaderIncompleteException(String msg) {
+      super(msg);
+    }
+
+    public LogHeaderIncompleteException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+
+    public LogHeaderIncompleteException(Throwable cause) {
+      super(cause);
+    }
+  }
+
   public static class DFSLoggerInputStreams {
 
     private FSDataInputStream originalInput;
@@ -229,7 +249,9 @@ public class DfsLogger {
 
   /**
    * Refernce a pre-existing log file.
-   * @param meta the cq for the "log" entry in +r/!0
+   *
+   * @param meta
+   *          the cq for the "log" entry in +r/!0
    */
   public DfsLogger(ServerResources conf, String filename, String meta) throws IOException {
     this.conf = conf;
@@ -243,75 +265,82 @@ public class DfsLogger {
 
     byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes();
     byte[] magicBuffer = new byte[magic.length];
-    input.readFully(magicBuffer);
-    if (Arrays.equals(magicBuffer, magic)) {
-      // additional parameters it needs from the underlying stream.
-      String cryptoModuleClassname = input.readUTF();
-      CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
+    try {
+      input.readFully(magicBuffer);
+      if (Arrays.equals(magicBuffer, magic)) {
+        // additional parameters it needs from the underlying stream.
+        String cryptoModuleClassname = input.readUTF();
+        CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
 
-      // Create the parameters and set the input stream into those parameters
-      CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
-      params.setEncryptedInputStream(input);
+        // Create the parameters and set the input stream into those parameters
+        CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+        params.setEncryptedInputStream(input);
 
-      // Create the plaintext input stream from the encrypted one
-      params = cryptoModule.getDecryptingInputStream(params);
+        // Create the plaintext input stream from the encrypted one
+        params = cryptoModule.getDecryptingInputStream(params);
 
-      if (params.getPlaintextInputStream() instanceof DataInputStream) {
-        decryptingInput = (DataInputStream) params.getPlaintextInputStream();
-      } else {
-        decryptingInput = new DataInputStream(params.getPlaintextInputStream());
-      }
-    } else {
-      input.seek(0);
-      byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
-      byte[] magicBufferV2 = new byte[magicV2.length];
-      input.readFully(magicBufferV2);
-
-      if (Arrays.equals(magicBufferV2, magicV2)) {
-        // Log files from 1.5 dump their options in raw to the logger files. Since we don't know the class
-        // that needs to read those files, we can make a couple of basic assumptions. Either it's going to be
-        // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
-
-        // If it's null, we won't have any parameters whatsoever. First, let's attempt to read
-        // parameters
-        Map<String,String> opts = new HashMap<String,String>();
-        int count = input.readInt();
-        for (int i = 0; i < count; i++) {
-          String key = input.readUTF();
-          String value = input.readUTF();
-          opts.put(key, value);
+        if (params.getPlaintextInputStream() instanceof DataInputStream) {
+          decryptingInput = (DataInputStream) params.getPlaintextInputStream();
+        } else {
+          decryptingInput = new DataInputStream(params.getPlaintextInputStream());
         }
+      } else {
+        input.seek(0);
+        byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
+        byte[] magicBufferV2 = new byte[magicV2.length];
+        input.readFully(magicBufferV2);
+
+        if (Arrays.equals(magicBufferV2, magicV2)) {
+          // Log files from 1.5 dump their options in raw to the logger files. Since we don't know the class
+          // that needs to read those files, we can make a couple of basic assumptions. Either it's going to be
+          // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
+
+          // If it's null, we won't have any parameters whatsoever. First, let's attempt to read
+          // parameters
+          Map<String,String> opts = new HashMap<String,String>();
+          int count = input.readInt();
+          for (int i = 0; i < count; i++) {
+            String key = input.readUTF();
+            String value = input.readUTF();
+            opts.put(key, value);
+          }
 
-        if (opts.size() == 0) {
-          // NullCryptoModule, we're done
-          decryptingInput = input;
-        } else {
+          if (opts.size() == 0) {
+            // NullCryptoModule, we're done
+            decryptingInput = input;
+          } else {
 
-          // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot.
-          org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
-              .getCryptoModule(DefaultCryptoModule.class.getName());
+            // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot.
+            org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
+                .getCryptoModule(DefaultCryptoModule.class.getName());
 
-          CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+            CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
 
-          input.seek(0);
-          input.readFully(magicBufferV2);
-          params.setEncryptedInputStream(input);
+            input.seek(0);
+            input.readFully(magicBufferV2);
+            params.setEncryptedInputStream(input);
 
-          params = cryptoModule.getDecryptingInputStream(params);
-          if (params.getPlaintextInputStream() instanceof DataInputStream) {
-            decryptingInput = (DataInputStream) params.getPlaintextInputStream();
-          } else {
-            decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+            params = cryptoModule.getDecryptingInputStream(params);
+            if (params.getPlaintextInputStream() instanceof DataInputStream) {
+              decryptingInput = (DataInputStream) params.getPlaintextInputStream();
+            } else {
+              decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+            }
           }
-        }
 
-      } else {
+        } else {
 
-        input.seek(0);
-        decryptingInput = input;
-      }
+          input.seek(0);
+          decryptingInput = input;
+        }
 
+      }
+    } catch (EOFException e) {
+      log.warn("Got EOFException trying to read WAL header information, assuming the rest of the file (" + path + ") has no data.");
+      // A TabletServer might have died before the (complete) header was written
+      throw new LogHeaderIncompleteException(e);
     }
+
     return new DFSLoggerInputStreams(input, decryptingInput);
   }
 
@@ -438,7 +467,7 @@ public class DfsLogger {
     }
 
     // wait for background thread to finish before closing log file
-    if(syncThread != null){
+    if (syncThread != null) {
       try {
         syncThread.join();
       } catch (InterruptedException e) {
@@ -446,12 +475,12 @@ public class DfsLogger {
       }
     }
 
-    //expect workq should be empty at this point
-    if(workQueue.size() != 0){
+    // expect workq should be empty at this point
+    if (workQueue.size() != 0) {
       log.error("WAL work queue not empty after sync thread exited");
       throw new IllegalStateException("WAL work queue not empty after sync thread exited");
     }
-    
+
     if (encryptingLogFile != null)
       try {
         logFile.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index bb8e3c7..6095f88 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -37,9 +37,11 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
+import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -50,7 +52,7 @@ import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 
 /**
- * 
+ *
  */
 public class LogSorter {
 
@@ -110,7 +112,19 @@ public class LogSorter {
         // the following call does not throw an exception if the file/dir does not exist
         fs.deleteRecursively(new Path(destPath));
 
-        DFSLoggerInputStreams inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf);
+        DFSLoggerInputStreams inputStreams;
+        try {
+          inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf);
+        } catch (LogHeaderIncompleteException e) {
+          log.warn("Could not read header from write-ahead log " + srcPath + ". Not sorting.");
+          // Creating a 'finished' marker will cause recovery to proceed normally and the
+          // empty file will be correctly ignored downstream.
+          fs.mkdirs(new Path(destPath));
+          writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> emptyList(), part++);
+          fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+          return;
+        }
+
         this.input = inputStreams.getOriginalInput();
         this.decryptingInput = inputStreams.getDecryptingInputStream();
 
@@ -140,7 +154,7 @@ public class LogSorter {
         try {
           // parent dir may not exist
           fs.mkdirs(new Path(destPath));
-          fs.create(new Path(destPath, "failed")).close();
+          fs.create(SortedLogState.getFailedMarkerPath(destPath)).close();
         } catch (IOException e) {
           log.error("Error creating failed flag file " + name, e);
         }
@@ -158,10 +172,10 @@ public class LogSorter {
       }
     }
 
-    private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    private void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
       Path path = new Path(destPath, String.format("part-r-%05d", part++));
       FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
-      
+
       @SuppressWarnings("deprecation")
       MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path.toString(), LogFileKey.class, LogFileValue.class);
       try {
@@ -180,10 +194,14 @@ public class LogSorter {
     }
 
     synchronized void close() throws IOException {
-      bytesCopied = input.getPos();
-      input.close();
-      decryptingInput.close();
-      input = null;
+      // If we receive an empty or malformed-header WAL, we won't
+      // have input streams that need closing. Avoid the NPE.
+      if (null != input) {
+        bytesCopied = input.getPos();
+        input.close();
+        decryptingInput.close();
+        input = null;
+      }
     }
 
     public synchronized long getSortTime() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
index 541f075..c2a0683 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
@@ -20,6 +20,7 @@ import java.io.EOFException;
 import java.io.IOException;
 
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.commons.collections.buffer.PriorityBuffer;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,23 +33,23 @@ import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Provide simple Map.Reader methods over multiple Maps.
- * 
+ *
  * Presently only supports next() and seek() and works on all the Map directories within a directory. The primary purpose of this class is to merge the results
  * of multiple Reduce jobs that result in Map output files.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class MultiReader {
-  
+
   /**
    * Group together the next key/value from a Reader with the Reader
-   * 
+   *
    */
   private static class Index implements Comparable<Index> {
     Reader reader;
     WritableComparable key;
     Writable value;
     boolean cached = false;
-    
+
     private static Object create(java.lang.Class<?> klass) {
       try {
         return klass.getConstructor().newInstance();
@@ -56,19 +57,20 @@ public class MultiReader {
         throw new RuntimeException("Unable to construct objects to use for comparison");
       }
     }
-    
+
     public Index(Reader reader) {
       this.reader = reader;
       key = (WritableComparable) create(reader.getKeyClass());
       value = (Writable) create(reader.getValueClass());
     }
-    
+
     private void cache() throws IOException {
       if (!cached && reader.next(key, value)) {
         cached = true;
       }
     }
-    
+
+    @Override
     public int compareTo(Index o) {
       try {
         cache();
@@ -84,16 +86,16 @@ public class MultiReader {
       }
     }
   }
-  
+
   private PriorityBuffer heap = new PriorityBuffer();
-  
+
   @SuppressWarnings("deprecation")
   public MultiReader(VolumeManager fs, Path directory) throws IOException {
     boolean foundFinish = false;
     for (FileStatus child : fs.listStatus(directory)) {
       if (child.getPath().getName().startsWith("_"))
         continue;
-      if (child.getPath().getName().equals("finished")) {
+      if (SortedLogState.isFinished(child.getPath().getName())) {
         foundFinish = true;
         continue;
       }
@@ -101,9 +103,9 @@ public class MultiReader {
       heap.add(new Index(new Reader(ns, child.getPath().toString(), ns.getConf())));
     }
     if (!foundFinish)
-      throw new IOException("Sort \"finished\" flag not found in " + directory);
+      throw new IOException("Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not found in " + directory);
   }
-  
+
   private static void copy(Writable src, Writable dest) throws IOException {
     // not exactly efficient...
     DataOutputBuffer output = new DataOutputBuffer();
@@ -112,7 +114,7 @@ public class MultiReader {
     input.reset(output.getData(), output.getLength());
     dest.readFields(input);
   }
-  
+
   public synchronized boolean next(WritableComparable key, Writable val) throws IOException {
     Index elt = (Index) heap.remove();
     try {
@@ -129,7 +131,7 @@ public class MultiReader {
     }
     return true;
   }
-  
+
   public synchronized boolean seek(WritableComparable key) throws IOException {
     PriorityBuffer reheap = new PriorityBuffer(heap.size());
     boolean result = false;
@@ -149,7 +151,7 @@ public class MultiReader {
     heap = reheap;
     return result;
   }
-  
+
   public void close() throws IOException {
     IOException problem = null;
     for (Object obj : heap) {
@@ -164,5 +166,5 @@ public class MultiReader {
       throw problem;
     heap = null;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index a1229e7..c3f4fd0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -35,14 +35,17 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
+import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.log.MultiReader;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 
 public class LogReader {
+  private static final Logger log = Logger.getLogger(LogReader.class);
 
   static class Opts extends Help {
     @Parameter(names = "-r", description = "print only mutations associated with the given row")
@@ -59,7 +62,7 @@ public class LogReader {
 
   /**
    * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
-   * 
+   *
    * @param args
    *          - first argument is the file to print
    */
@@ -96,7 +99,13 @@ public class LogReader {
 
       if (fs.isFile(path)) {
         // read log entries from a simple hdfs file
-        DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration());
+        DFSLoggerInputStreams streams;
+        try {
+          streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration());
+        } catch (LogHeaderIncompleteException e) {
+          log.warn("Could not read header for " + path + ". Ignoring...");
+          continue;
+        }
         DataInputStream input = streams.getDecryptingInputStream();
 
         try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index fffa15e..03361d1 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -40,6 +40,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
@@ -132,7 +133,7 @@ public class SortedLogRecoveryTest {
           map.append(lfe.key, lfe.value);
         }
         map.close();
-        ns.create(new Path(path, "finished")).close();
+        ns.create(SortedLogState.getFinishedMarkerPath(path)).close();
         dirs.add(new Path(path));
       }
       // Recover

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
index d6c23e3..1f2ce6a 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.Path;
@@ -66,7 +67,7 @@ public class TestUpgradePathForWALogs {
     fs = VolumeManagerImpl.getLocal(path);
     Path manyMapsPath = new Path("file://" + path);
     fs.mkdirs(manyMapsPath);
-    fs.create(new Path(manyMapsPath, "finished")).close();
+    fs.create(SortedLogState.getFinishedMarkerPath(manyMapsPath)).close();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
new file mode 100644
index 0000000..7f2f6f9
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+
+/**
+ *
+ */
+public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
+  private static final Logger log = LoggerFactory.getLogger(MissingWalHeaderCompletesRecoveryIT.class);
+
+  private static boolean rootHasWritePermission;
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration conf) {
+    cfg.setNumTservers(1);
+    cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
+    // Make sure the GC doesn't delete the file before the metadata reference is added
+    cfg.setProperty(Property.GC_CYCLE_START, "999999s");
+    conf.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Before
+  public void setupMetadataPermission() throws Exception {
+    Connector conn = getConnector();
+    rootHasWritePermission = conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+    if (!rootHasWritePermission) {
+      conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+      // Make sure it propagates through ZK
+      Thread.sleep(5000);
+    }
+  }
+
+  @After
+  public void resetMetadataPermission() throws Exception {
+    Connector conn = getConnector();
+    // Final state doesn't match the original
+    if (rootHasWritePermission != conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE)) {
+      if (rootHasWritePermission) {
+        // root had write permission when starting, ensure root still does
+        conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+      } else {
+        // root did not have write permission when starting, ensure that it does not
+        conn.securityOperations().revokeTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+      }
+    }
+  }
+
+  @Test
+  public void testEmptyWalRecoveryCompletes() throws Exception {
+    Connector conn = getConnector();
+    MiniAccumuloClusterImpl cluster = getCluster();
+    FileSystem fs = FileSystem.get(new Configuration());
+
+    // Fake out something that looks like host:port, it's irrelevant
+    String fakeServer = "127.0.0.1:12345";
+
+    File walogs = new File(cluster.getConfig().getAccumuloDir(), ServerConstants.WAL_DIR);
+    File walogServerDir = new File(walogs, fakeServer.replace(':', '+'));
+    File emptyWalog = new File(walogServerDir, UUID.randomUUID().toString());
+
+    log.info("Created empty WAL at " + emptyWalog.toURI());
+
+    fs.create(new Path(emptyWalog.toURI())).close();
+
+    Assert.assertTrue("root user did not have write permission to metadata table",
+        conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE));
+
+    String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    Assert.assertNotNull("Table ID was null", tableId);
+
+    LogEntry logEntry = new LogEntry();
+    logEntry.server = "127.0.0.1:12345";
+    logEntry.filename = emptyWalog.toURI().toString();
+    logEntry.tabletId = 10;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+
+    log.info("Taking {} offline", tableName);
+    conn.tableOperations().offline(tableName, true);
+
+    log.info("{} is offline", tableName);
+
+    Text row = MetadataSchema.TabletsSection.getRow(new Text(tableId), null);
+    Mutation m = new Mutation(row);
+    m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("Bringing {} online", tableName);
+    conn.tableOperations().online(tableName, true);
+
+    log.info("{} is online", tableName);
+
+    // Reading the table implies that recovery completed successfully (the empty file was ignored)
+    // otherwise the tablet will never come online and we won't be able to read it.
+    Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+    Assert.assertEquals(0, Iterables.size(s));
+  }
+
+  @Test
+  public void testPartialHeaderWalRecoveryCompletes() throws Exception {
+    Connector conn = getConnector();
+    MiniAccumuloClusterImpl cluster = getCluster();
+    FileSystem fs = FileSystem.get(new Configuration());
+
+    // Fake out something that looks like host:port, it's irrelevant
+    String fakeServer = "127.0.0.1:12345";
+
+    File walogs = new File(cluster.getConfig().getAccumuloDir(), ServerConstants.WAL_DIR);
+    File walogServerDir = new File(walogs, fakeServer.replace(':', '+'));
+    File partialHeaderWalog = new File(walogServerDir, UUID.randomUUID().toString());
+
+    log.info("Created WAL with malformed header at " + partialHeaderWalog.toURI());
+
+    // Write half of the header
+    FSDataOutputStream wal = fs.create(new Path(partialHeaderWalog.toURI()));
+    wal.write(DfsLogger.LOG_FILE_HEADER_V3.getBytes(), 0, DfsLogger.LOG_FILE_HEADER_V3.length() / 2);
+    wal.close();
+
+    Assert.assertTrue("root user did not have write permission to metadata table",
+        conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE));
+
+    String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    Assert.assertNotNull("Table ID was null", tableId);
+
+    LogEntry logEntry = new LogEntry();
+    logEntry.server = "127.0.0.1:12345";
+    logEntry.filename = partialHeaderWalog.toURI().toString();
+    logEntry.tabletId = 10;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+
+    log.info("Taking {} offline", tableName);
+    conn.tableOperations().offline(tableName, true);
+
+    log.info("{} is offline", tableName);
+
+    Text row = MetadataSchema.TabletsSection.getRow(new Text(tableId), null);
+    Mutation m = new Mutation(row);
+    m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("Bringing {} online", tableName);
+    conn.tableOperations().online(tableName, true);
+
+    log.info("{} is online", tableName);
+
+    // Reading the table implies that recovery completed successfully (the empty file was ignored)
+    // otherwise the tablet will never come online and we won't be able to read it.
+    Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+    Assert.assertEquals(0, Iterables.size(s));
+  }
+
+}


[3/4] git commit: Merge branch '1.6'

Posted by el...@apache.org.
Merge branch '1.6'

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/089408d5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/089408d5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/089408d5

Branch: refs/heads/master
Commit: 089408d596941e3c621037d35288bdd87deca5b7
Parents: 9a8554f d65e0e3
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 2 19:18:55 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 2 19:18:55 2014 -0400

----------------------------------------------------------------------
 .../accumulo/server/log/SortedLogState.java     |  66 ++++++
 .../master/recovery/RecoveryManager.java        |   3 +-
 .../apache/accumulo/tserver/TabletServer.java   |   3 +-
 .../apache/accumulo/tserver/log/DfsLogger.java  | 149 +++++++------
 .../apache/accumulo/tserver/log/LogSorter.java  |  36 ++-
 .../accumulo/tserver/log/MultiReader.java       |  34 +--
 .../accumulo/tserver/logger/LogReader.java      |  13 +-
 .../replication/AccumuloReplicaSystem.java      |  18 ++
 .../tserver/log/SortedLogRecoveryTest.java      |   3 +-
 .../tserver/log/TestUpgradePathForWALogs.java   |   3 +-
 .../MissingWalHeaderCompletesRecoveryIT.java    | 217 +++++++++++++++++++
 11 files changed, 453 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/089408d5/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/089408d5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/089408d5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 8de2b25,2bd0b47..e923ebc
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@@ -16,13 -16,20 +16,14 @@@
   */
  package org.apache.accumulo.tserver.log;
  
 -import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
 -import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 -import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
 -import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
 -import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 -
  import java.io.DataInputStream;
  import java.io.DataOutputStream;
+ import java.io.EOFException;
  import java.io.IOException;
  import java.io.OutputStream;
 -import java.lang.reflect.InvocationTargetException;
  import java.lang.reflect.Method;
  import java.nio.channels.ClosedChannelException;
 +import java.nio.charset.StandardCharsets;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collections;
@@@ -71,11 -71,10 +72,10 @@@ import static org.apache.accumulo.tserv
   *
   */
  public class DfsLogger {
-   // Package private so that LogSorter can find this
-   static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
-   static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
+   public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
+   public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
  
 -  private static Logger log = Logger.getLogger(DfsLogger.class);
 +  private static final Logger log = Logger.getLogger(DfsLogger.class);
  
    public static class LogClosedException extends IOException {
      private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/089408d5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/089408d5/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 681fbd3,c3f4fd0..a444b45
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@@ -96,7 -99,13 +99,13 @@@ public class LogReader 
  
        if (fs.isFile(path)) {
          // read log entries from a simple hdfs file
-         DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getInstance());
+         DFSLoggerInputStreams streams;
+         try {
 -          streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration());
++          streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getInstance());
+         } catch (LogHeaderIncompleteException e) {
+           log.warn("Could not read header for " + path + ". Ignoring...");
+           continue;
+         }
          DataInputStream input = streams.getDecryptingInputStream();
  
          try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/089408d5/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 416a86e,0000000..bed2d89
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@@ -1,700 -1,0 +1,718 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.replication;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.client.impl.ClientExecReturn;
 +import org.apache.accumulo.core.client.impl.ReplicationClient;
 +import org.apache.accumulo.core.client.impl.ServerConfigurationUtil;
 +import org.apache.accumulo.core.client.replication.ReplicaSystem;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.file.rfile.RFile;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 +import org.apache.accumulo.core.replication.ReplicationTarget;
 +import org.apache.accumulo.core.replication.StatusUtil;
 +import org.apache.accumulo.core.replication.proto.Replication.Status;
 +import org.apache.accumulo.core.replication.thrift.KeyValues;
 +import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 +import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
 +import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client;
 +import org.apache.accumulo.core.replication.thrift.WalEdits;
 +import org.apache.accumulo.core.security.Credentials;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.tserver.log.DfsLogger;
 +import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 +import org.apache.accumulo.tserver.logger.LogFileKey;
 +import org.apache.accumulo.tserver.logger.LogFileValue;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.thrift.transport.TTransportException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +/**
 + *
 + */
 +public class AccumuloReplicaSystem implements ReplicaSystem {
 +  private static final Logger log = LoggerFactory.getLogger(AccumuloReplicaSystem.class);
 +  private static final String RFILE_SUFFIX = "." + RFile.EXTENSION;
 +
 +  private String instanceName, zookeepers;
 +  private AccumuloConfiguration conf;
 +  private VolumeManager fs;
 +
 +  protected String getInstanceName() {
 +    return instanceName;
 +  }
 +
 +  protected void setInstanceName(String instanceName) {
 +    this.instanceName = instanceName;
 +  }
 +
 +  protected String getZookeepers() {
 +    return zookeepers;
 +  }
 +
 +  protected void setZookeepers(String zookeepers) {
 +    this.zookeepers = zookeepers;
 +  }
 +
 +  protected AccumuloConfiguration getConf() {
 +    return conf;
 +  }
 +
 +  protected void setConf(AccumuloConfiguration conf) {
 +    this.conf = conf;
 +  }
 +
 +  protected VolumeManager getFs() {
 +    return fs;
 +  }
 +
 +  protected void setFs(VolumeManager fs) {
 +    this.fs = fs;
 +  }
 +
 +  /**
 +   * Generate the configuration string for this ReplicaSystem
 +   */
 +  public static String buildConfiguration(String instanceName, String zookeepers) {
 +    return instanceName + "," + zookeepers;
 +  }
 +
 +  @Override
 +  public void configure(String configuration) {
 +    Preconditions.checkNotNull(configuration);
 +
 +    // instance_name,zookeepers
 +    int index = configuration.indexOf(',');
 +    if (-1 == index) {
 +      try {
 +        Thread.sleep(1000);
 +      } catch (InterruptedException e) {
 +        Thread.currentThread().interrupt();
 +      }
 +      throw new IllegalArgumentException("Expected comma in configuration string");
 +    }
 +
 +    instanceName = configuration.substring(0, index);
 +    zookeepers = configuration.substring(index + 1);
 +
 +    conf = new ServerConfigurationFactory(HdfsZooInstance.getInstance()).getConfiguration();
 +
 +    try {
 +      fs = VolumeManagerImpl.get(conf);
 +    } catch (IOException e) {
 +      log.error("Could not connect to filesystem", e);
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  @Override
 +  public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
 +    final Instance localInstance = HdfsZooInstance.getInstance();
 +    final AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance);
 +    Credentials credentialsForPeer = getCredentialsForPeer(localConf, target);
 +    final TCredentials tCredsForPeer = credentialsForPeer.toThrift(localInstance);
 +
 +    try {
 +      Trace.on("AccumuloReplicaSystem");
 +
 +      Instance peerInstance = getPeerInstance(target);
 +      // Remote identifier is an integer (table id) in this case.
 +      final String remoteTableId = target.getRemoteIdentifier();
 +
 +      // Attempt the replication of this status a number of times before giving up and
 +      // trying to replicate it again later some other time.
 +      int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
 +      for (int i = 0; i < numAttempts; i++) {
 +        String peerTserver;
 +        Span span = Trace.start("Fetch peer tserver");
 +        try {
 +          // Ask the master on the remote what TServer we should talk with to replicate the data
 +          peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new ClientExecReturn<String,ReplicationCoordinator.Client>() {
 +
 +            @Override
 +            public String execute(ReplicationCoordinator.Client client) throws Exception {
 +              return client.getServicerAddress(remoteTableId, tCredsForPeer);
 +            }
 +
 +          });
 +        } catch (AccumuloException | AccumuloSecurityException e) {
 +          // No progress is made
 +          log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", target, e);
 +          continue;
 +        } finally {
 +          span.stop();
 +        }
 +
 +        if (null == peerTserver) {
 +          // Something went wrong, and we didn't get a valid tserver from the remote for some reason
 +          log.warn("Did not receive tserver from master at {}, cannot proceed with replication. Will retry.", target);
 +          continue;
 +        }
 +
 +        // We have a tserver on the remote -- send the data its way.
 +        Status finalStatus;
 +        final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
 +        try {
 +          if (p.getName().endsWith(RFILE_SUFFIX)) {
 +            span = Trace.start("RFile replication");
 +            try {
 +              finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
 +            } finally {
 +              span.stop();
 +            }
 +          } else {
 +            span = Trace.start("WAL replication");
 +            try {
 +              finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
 +            } finally {
 +              span.stop();
 +            }
 +          }
 +
 +          log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus));
 +
 +          return finalStatus;
 +        } catch (TTransportException | AccumuloException | AccumuloSecurityException e) {
 +          log.warn("Could not connect to remote server {}, will retry", peerTserver, e);
 +          UtilWaitThread.sleep(1000);
 +        }
 +      }
 +
 +      log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", numAttempts, p);
 +
 +      // We made no status, punt on it for now, and let it re-queue itself for work
 +      return status;
 +    } finally {
 +      Trace.offNoFlush();
 +    }
 +  }
 +
 +  protected Status replicateRFiles(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p, final Status status,
 +      final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException,
 +      AccumuloException, AccumuloSecurityException {
 +    DataInputStream input;
 +    try {
 +      input = getRFileInputStream(p);
 +    } catch (IOException e) {
 +      log.error("Could not create input stream from RFile, will retry", e);
 +      return status;
 +    }
 +
 +    Status lastStatus = status, currentStatus = status;
 +    while (true) {
 +      // Read and send a batch of mutations
 +      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, new RFileClientExecReturn(target, input, p,
 +          currentStatus, sizeLimit, remoteTableId, tcreds));
 +
 +      // Catch the overflow
 +      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
 +      if (newBegin < 0) {
 +        newBegin = Long.MAX_VALUE;
 +      }
 +
 +      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
 +
 +      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus));
 +
 +      // If we got a different status
 +      if (!currentStatus.equals(lastStatus)) {
 +        // If we don't have any more work, just quit
 +        if (!StatusUtil.isWorkRequired(currentStatus)) {
 +          return currentStatus;
 +        } else {
 +          // Otherwise, let it loop and replicate some more data
 +          lastStatus = currentStatus;
 +        }
 +      } else {
 +        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus));
 +
 +        // otherwise, we didn't actually replicate (likely because there was error sending the data)
 +        // we can just not record any updates, and it will be picked up again by the work assigner
 +        return status;
 +      }
 +    }
 +  }
 +
 +  protected Status replicateLogs(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p, final Status status,
 +      final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException,
 +      AccumuloException, AccumuloSecurityException {
 +
 +    final Set<Integer> tids;
 +    final DataInputStream input;
 +    Span span = Trace.start("Read WAL header");
 +    span.data("file", p.toString());
 +    try {
 +      input = getWalStream(p);
++    } catch (LogHeaderIncompleteException e) {
++      log.warn("Could not read header from {}, assuming that there is no data present in the WAL, therefore replication is complete", p);
++      Status newStatus;
++      // Bump up the begin to the (infinite) end, trying to be accurate
++      if (status.getInfiniteEnd()) {
++        newStatus = Status.newBuilder(status).setBegin(Long.MAX_VALUE).build();
++      } else {
++        newStatus = Status.newBuilder(status).setBegin(status.getEnd()).build();
++      }
++      span = Trace.start("Update replication table");
++      try {
++        helper.recordNewStatus(p, newStatus, target);
++      } catch (TableNotFoundException tnfe) {
++        log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(newStatus), e);
++        throw new RuntimeException("Replication table did not exist, will retry", e);
++      } finally {
++        span.stop();
++      }
 +    } catch (IOException e) {
 +      log.error("Could not create stream for WAL", e);
 +      // No data sent (bytes nor records) and no progress made
 +      return status;
 +    } finally {
 +      span.stop();
 +    }
 +
 +    span = Trace.start("Consume WAL prefix");
 +    span.data("file", p.toString());
 +    try {
 +      // We want to read all records in the WAL up to the "begin" offset contained in the Status message,
 +      // building a Set of tids from DEFINE_TABLET events which correspond to table ids for future mutations
 +      tids = consumeWalPrefix(target, input, p, status, sizeLimit);
 +    } catch (IOException e) {
 +      log.warn("Unexpected error consuming file.");
 +      return status;
 +    } finally {
 +      span.stop();
 +    }
 +
 +    Status lastStatus = status, currentStatus = status;
 +    while (true) {
 +      // Set some trace info
 +      span = Trace.start("Replicate WAL batch");
 +      span.data("Batch size (bytes)", Long.toString(sizeLimit));
 +      span.data("File", p.toString());
 +      span.data("Peer instance name", peerInstance.getInstanceName());
 +      span.data("Peer tserver", peerTserver);
 +      span.data("Remote table ID", remoteTableId);
 +
 +      ReplicationStats replResult;
 +      try {
 +        // Read and send a batch of mutations
 +        replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
 +            remoteTableId, tcreds, tids));
 +      } catch (Exception e) {
 +        log.error("Caught exception replicating data to {} at {}", peerInstance.getInstanceName(), peerTserver, e);
 +        throw e;
 +      } finally {
 +        span.stop();
 +      }
 +
 +      // Catch the overflow
 +      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
 +      if (newBegin < 0) {
 +        newBegin = Long.MAX_VALUE;
 +      }
 +
 +      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
 +
 +      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus));
 +
 +      // If we got a different status
 +      if (!currentStatus.equals(lastStatus)) {
 +        span = Trace.start("Update replication table");
 +        try {
 +          helper.recordNewStatus(p, currentStatus, target);
 +        } catch (TableNotFoundException e) {
 +          log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
 +          throw new RuntimeException("Replication table did not exist, will retry", e);
 +        } finally {
 +          span.stop();
 +        }
 +
 +        // If we don't have any more work, just quit
 +        if (!StatusUtil.isWorkRequired(currentStatus)) {
 +          return currentStatus;
 +        } else {
 +          // Otherwise, let it loop and replicate some more data
 +          lastStatus = currentStatus;
 +        }
 +      } else {
 +        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus));
 +
 +        // otherwise, we didn't actually replicate (likely because there was error sending the data)
 +        // we can just not record any updates, and it will be picked up again by the work assigner
 +        return status;
 +      }
 +    }
 +  }
 +
 +  protected class WalClientExecReturn implements ClientExecReturn<ReplicationStats,ReplicationServicer.Client> {
 +
 +    private ReplicationTarget target;
 +    private DataInputStream input;
 +    private Path p;
 +    private Status status;
 +    private long sizeLimit;
 +    private String remoteTableId;
 +    private TCredentials tcreds;
 +    private Set<Integer> tids;
 +
 +    public WalClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit, String remoteTableId, TCredentials tcreds,
 +        Set<Integer> tids) {
 +      this.target = target;
 +      this.input = input;
 +      this.p = p;
 +      this.status = status;
 +      this.sizeLimit = sizeLimit;
 +      this.remoteTableId = remoteTableId;
 +      this.tcreds = tcreds;
 +      this.tids = tids;
 +    }
 +
 +    @Override
 +    public ReplicationStats execute(Client client) throws Exception {
 +      WalReplication edits = getWalEdits(target, input, p, status, sizeLimit, tids);
 +
 +      log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", (Long.MAX_VALUE == edits.entriesConsumed) ? "all"
 +          : edits.entriesConsumed, edits.sizeInBytes, p);
 +
 +      // If we have some edits to send
 +      if (0 < edits.walEdits.getEditsSize()) {
 +        long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
 +        if (entriesReplicated != edits.numUpdates) {
 +          log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated);
 +        }
 +
 +        // We don't have to replicate every LogEvent in the file (only Mutation LogEvents), but we
 +        // want to track progress in the file relative to all LogEvents (to avoid duplicative processing/replication)
 +        return edits;
 +      } else if (edits.entriesConsumed > 0) {
 +        // Even if we send no data, we want to record a non-zero new begin value to avoid checking the same
 +        // log entries multiple times to determine if they should be sent
 +        return edits;
 +      }
 +
 +      // No data sent (bytes nor records) and no progress made
 +      return new ReplicationStats(0l, 0l, 0l);
 +    }
 +  }
 +
 +  protected class RFileClientExecReturn implements ClientExecReturn<ReplicationStats,ReplicationServicer.Client> {
 +
 +    private ReplicationTarget target;
 +    private DataInputStream input;
 +    private Path p;
 +    private Status status;
 +    private long sizeLimit;
 +    private String remoteTableId;
 +    private TCredentials tcreds;
 +
 +    public RFileClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit, String remoteTableId, TCredentials tcreds) {
 +      this.target = target;
 +      this.input = input;
 +      this.p = p;
 +      this.status = status;
 +      this.sizeLimit = sizeLimit;
 +      this.remoteTableId = remoteTableId;
 +      this.tcreds = tcreds;
 +    }
 +
 +    @Override
 +    public ReplicationStats execute(Client client) throws Exception {
 +      RFileReplication kvs = getKeyValues(target, input, p, status, sizeLimit);
 +      if (0 < kvs.keyValues.getKeyValuesSize()) {
 +        long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues, tcreds);
 +        if (entriesReplicated != kvs.keyValues.getKeyValuesSize()) {
 +          log.warn("Sent {} KeyValue entries for replication but only {} were reported as replicated", kvs.keyValues.getKeyValuesSize(), entriesReplicated);
 +        }
 +
 +        // Not as important to track as WALs because we don't skip any KVs in an RFile
 +        return kvs;
 +      }
 +
 +      // No data sent (bytes nor records) and no progress made
 +      return new ReplicationStats(0l, 0l, 0l);
 +    }
 +  }
 +
 +  protected Credentials getCredentialsForPeer(AccumuloConfiguration conf, ReplicationTarget target) {
 +    Preconditions.checkNotNull(conf);
 +    Preconditions.checkNotNull(target);
 +
 +    String peerName = target.getPeerName();
 +    String userKey = Property.REPLICATION_PEER_USER.getKey() + peerName, passwordKey = Property.REPLICATION_PEER_PASSWORD.getKey() + peerName;
 +    Map<String,String> peerUsers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_USER);
 +    Map<String,String> peerPasswords = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_PASSWORD);
 +
 +    String user = peerUsers.get(userKey);
 +    String password = peerPasswords.get(passwordKey);
 +    if (null == user || null == password) {
 +      throw new IllegalArgumentException(userKey + " and " + passwordKey + " not configured, cannot replicate");
 +    }
 +
 +    return new Credentials(user, new PasswordToken(password));
 +  }
 +
 +  protected Instance getPeerInstance(ReplicationTarget target) {
 +    return new ZooKeeperInstance(instanceName, zookeepers);
 +  }
 +
 +  protected RFileReplication getKeyValues(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit) {
 +    // TODO ACCUMULO-2580 Implement me
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  protected Set<Integer> consumeWalPrefix(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit) throws IOException {
 +    Set<Integer> tids = new HashSet<>();
 +    LogFileKey key = new LogFileKey();
 +    LogFileValue value = new LogFileValue();
 +
 +    Set<Integer> desiredTids = new HashSet<>();
 +
 +    // Read through the stuff we've already processed in a previous replication attempt
 +    // We also need to track the tids that occurred earlier in the file as mutations
 +    // later on might use that tid
 +    for (long i = 0; i < status.getBegin(); i++) {
 +      key.readFields(wal);
 +      value.readFields(wal);
 +
 +      switch (key.event) {
 +        case DEFINE_TABLET:
 +          if (target.getSourceTableId().equals(key.tablet.getTableId().toString())) {
 +            desiredTids.add(key.tid);
 +          }
 +          break;
 +        default:
 +          break;
 +      }
 +    }
 +
 +    return tids;
 +  }
 +
 +  public DataInputStream getWalStream(Path p) throws IOException {
 +    DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf);
 +    return streams.getDecryptingInputStream();
 +  }
 +
 +  protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit, Set<Integer> desiredTids)
 +      throws IOException {
 +    WalEdits edits = new WalEdits();
 +    edits.edits = new ArrayList<ByteBuffer>();
 +    long size = 0l;
 +    long entriesConsumed = 0l;
 +    long numUpdates = 0l;
 +    LogFileKey key = new LogFileKey();
 +    LogFileValue value = new LogFileValue();
 +
 +    while (size < sizeLimit) {
 +      try {
 +        key.readFields(wal);
 +        value.readFields(wal);
 +      } catch (EOFException e) {
 +        log.debug("Caught EOFException reading {}", p);
 +        if (status.getInfiniteEnd() && status.getClosed()) {
 +          log.debug("{} is closed and has unknown length, assuming entire file has been consumed", p);
 +          entriesConsumed = Long.MAX_VALUE;
 +        }
 +        break;
 +      }
 +
 +      entriesConsumed++;
 +
 +      switch (key.event) {
 +        case DEFINE_TABLET:
 +          // For new DEFINE_TABLETs, we also need to record the new tids we see
 +          if (target.getSourceTableId().equals(key.tablet.getTableId().toString())) {
 +            desiredTids.add(key.tid);
 +          }
 +          break;
 +        case MUTATION:
 +        case MANY_MUTATIONS:
 +          // Only write out mutations for tids that are for the desired tablet
 +          if (desiredTids.contains(key.tid)) {
 +            ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +            DataOutputStream out = new DataOutputStream(baos);
 +
 +            key.write(out);
 +
 +            // Only write out the mutations that don't have the given ReplicationTarget
 +            // as a replicate source (this prevents infinite replication loops: a->b, b->a, repeat)
 +            numUpdates += writeValueAvoidingReplicationCycles(out, value, target);
 +
 +            out.flush();
 +            byte[] data = baos.toByteArray();
 +            size += data.length;
 +            edits.addToEdits(ByteBuffer.wrap(data));
 +          }
 +          break;
 +        default:
 +          log.trace("Ignorning WAL entry which doesn't contain mutations, should not have received such entries");
 +          break;
 +      }
 +    }
 +
 +    return new WalReplication(edits, size, entriesConsumed, numUpdates);
 +  }
 +
 +  /**
 +   * Wrapper around {@link LogFileValue#write(java.io.DataOutput)} which does not serialize {@link Mutation}s that do not need to be replicate to the given
 +   * {@link ReplicationTarget}
 +   */
 +  protected long writeValueAvoidingReplicationCycles(DataOutputStream out, LogFileValue value, ReplicationTarget target) throws IOException {
 +    // TODO This works like LogFileValue, and needs to be parsable by it, which makes this serialization brittle.
 +    // see matching TODO in BatchWriterReplicationReplayer
 +
 +    int mutationsToSend = 0;
 +    for (Mutation m : value.mutations) {
 +      if (!m.getReplicationSources().contains(target.getPeerName())) {
 +        mutationsToSend++;
 +      }
 +    }
 +
 +    int mutationsRemoved = value.mutations.size() - mutationsToSend;
 +    if (mutationsRemoved > 0) {
 +      log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", mutationsRemoved, target.getPeerName());
 +    }
 +
 +    // Add our name, and send it
 +    final String name = conf.get(Property.REPLICATION_NAME);
 +    if (StringUtils.isBlank(name)) {
 +      throw new IllegalArgumentException("Local system has no replication name configured");
 +    }
 +
 +    out.writeInt(mutationsToSend);
 +    for (Mutation m : value.mutations) {
 +      // If we haven't yet replicated to this peer
 +      if (!m.getReplicationSources().contains(target.getPeerName())) {
 +        m.addReplicationSource(name);
 +
 +        m.write(out);
 +      }
 +    }
 +
 +    return mutationsToSend;
 +  }
 +
 +  protected DataInputStream getRFileInputStream(Path p) throws IOException {
 +    throw new UnsupportedOperationException("Not yet implemented");
 +  }
 +
 +  public static class ReplicationStats {
 +    /**
 +     * The size, in bytes, of the data sent
 +     */
 +    public long sizeInBytes;
 +
 +    /**
 +     * The number of records sent
 +     */
 +    public long sizeInRecords;
 +
 +    /**
 +     * The number of entries consumed from the log (to increment {@link Status}'s begin)
 +     */
 +    public long entriesConsumed;
 +
 +    public ReplicationStats(long sizeInBytes, long sizeInRecords, long entriesConsumed) {
 +      this.sizeInBytes = sizeInBytes;
 +      this.sizeInRecords = sizeInRecords;
 +      this.entriesConsumed = entriesConsumed;
 +    }
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      if (ReplicationStats.class.isAssignableFrom(o.getClass())) {
 +        ReplicationStats other = (ReplicationStats) o;
 +        return sizeInBytes == other.sizeInBytes && sizeInRecords == other.sizeInRecords && entriesConsumed == other.entriesConsumed;
 +      }
 +      return false;
 +    }
 +  }
 +
 +  public static class RFileReplication extends ReplicationStats {
 +    /**
 +     * The data to send
 +     */
 +    public KeyValues keyValues;
 +
 +    public RFileReplication(KeyValues kvs, long size) {
 +      super(size, kvs.keyValues.size(), kvs.keyValues.size());
 +      this.keyValues = kvs;
 +    }
 +  }
 +
 +  /**
 +   * A "struct" to avoid a nested Entry. Contains the resultant information from collecting data for replication
 +   */
 +  public static class WalReplication extends ReplicationStats {
 +    /**
 +     * The data to send over the wire
 +     */
 +    public WalEdits walEdits;
 +
 +    /**
 +     * The number of updates contained in this batch
 +     */
 +    public long numUpdates;
 +
 +    public WalReplication(WalEdits edits, long size, long entriesConsumed, long numMutations) {
 +      super(size, edits.getEditsSize(), entriesConsumed);
 +      this.walEdits = edits;
 +      this.numUpdates = numMutations;
 +    }
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      if (o instanceof WalReplication) {
 +        WalReplication other = (WalReplication) o;
 +
 +        return super.equals(other) && walEdits.equals(other.walEdits) && numUpdates == other.numUpdates;
 +      }
 +
 +      return false;
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/089408d5/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
----------------------------------------------------------------------
diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
index f01ee10,1f2ce6a..1014306
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
@@@ -23,9 -23,10 +23,10 @@@ import java.io.IOException
  import java.io.InputStream;
  import java.io.OutputStream;
  
 -import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
  import org.apache.accumulo.server.fs.VolumeManager;
  import org.apache.accumulo.server.fs.VolumeManagerImpl;
+ import org.apache.accumulo.server.log.SortedLogState;
  import org.apache.commons.io.FileUtils;
  import org.apache.commons.io.IOUtils;
  import org.apache.hadoop.fs.Path;


[2/4] git commit: ACCUMULO-3182 Ensures that a WAL with a partial header (or completely) empty can be successfully recovered.

Posted by el...@apache.org.
ACCUMULO-3182 Ensures that a WAL with a partial header (or completely) empty can be successfully recovered.

In the case where a WAL is observed which has a partial or missing header,
the file is treated as "empty": no log entries will be attempted to be
read and replayed from it. This should restore the normaly functionality
that was observed with 1.5 WALs.

Integration test was introduced which tests that the introduction of
either an empty WAL or a WAL with a partial header does not block
recovery of the tablet which references the "bad" WAL.

Additionally, an enum was introduced to remove the use of hard-coded
WAL recovery status marker files: "finished" and "failed".


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d65e0e32
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d65e0e32
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d65e0e32

Branch: refs/heads/master
Commit: d65e0e32de29206b627e578b957ce66c0d644cea
Parents: 3c90ee9
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 2 19:04:44 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 2 19:04:44 2014 -0400

----------------------------------------------------------------------
 .../accumulo/server/log/SortedLogState.java     |  66 ++++++
 .../master/recovery/RecoveryManager.java        |   3 +-
 .../apache/accumulo/tserver/TabletServer.java   |   9 +-
 .../apache/accumulo/tserver/log/DfsLogger.java  | 157 ++++++++------
 .../apache/accumulo/tserver/log/LogSorter.java  |  36 ++-
 .../accumulo/tserver/log/MultiReader.java       |  34 +--
 .../accumulo/tserver/logger/LogReader.java      |  13 +-
 .../tserver/log/SortedLogRecoveryTest.java      |   3 +-
 .../tserver/log/TestUpgradePathForWALogs.java   |   3 +-
 .../MissingWalHeaderCompletesRecoveryIT.java    | 217 +++++++++++++++++++
 10 files changed, 443 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java b/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
new file mode 100644
index 0000000..f337cd8
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.log;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A file is written in the destination directory for the sorting of write-ahead logs that need recovering. The value of {@link #marker} is the name of the file
+ * that will exist in the sorted output directory.
+ */
+public enum SortedLogState {
+  FINISHED("finished"), FAILED("failed");
+
+  private String marker;
+
+  private SortedLogState(String marker) {
+    this.marker = marker;
+  }
+
+  public String getMarker() {
+    return marker;
+  }
+
+  public static boolean isFinished(String fileName) {
+    return FINISHED.getMarker().equals(fileName);
+  }
+
+  public static boolean isFailed(String fileName) {
+    return FAILED.getMarker().equals(fileName);
+  }
+
+  public static Path getFinishedMarkerPath(String rootPath) {
+    return new Path(rootPath, FINISHED.getMarker());
+  }
+
+  public static Path getFinishedMarkerPath(Path rootPath) {
+    return new Path(rootPath, FINISHED.getMarker());
+  }
+
+  public static Path getFailedMarkerPath(String rootPath) {
+    return new Path(rootPath, FAILED.getMarker());
+  }
+  
+  public static Path getFailedMarkerPath(Path rootPath) {
+    return new Path(rootPath, FAILED.getMarker());
+  }
+
+  @Override
+  public String toString() {
+    return marker;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
index 76d3520..0c2e1f0 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.master.recovery.HadoopLogCloser;
 import org.apache.accumulo.server.master.recovery.LogCloser;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
@@ -154,7 +155,7 @@ public class RecoveryManager {
           }
         }
 
-        if (master.getFileSystem().exists(new Path(dest, "finished"))) {
+        if (master.getFileSystem().exists(SortedLogState.getFinishedMarkerPath(dest))) {
           synchronized (this) {
             closeTasksQueued.remove(sortId);
             recoveryDelay.remove(sortId);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 03dc86d..2551bd0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -161,6 +161,7 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
@@ -250,7 +251,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
   private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
-  
+
   private TabletServerLogger logger;
 
   protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
@@ -336,7 +337,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     if (sawChange) {
       log.debug(sb.toString());
     }
-    
+
     final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
     if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
       long diff = now - lastMemoryCheckTime;
@@ -347,7 +348,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       lastMemoryCheckTime = now;
       return;
     }
-    
+
     if (maxIncreaseInCollectionTime > keepAliveTimeout) {
       Halt.halt("Garbage collection may be interfering with lock keep-alive.  Halting.", -1);
     }
@@ -3700,7 +3701,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       Path recovery = null;
       for (String log : entry.logSet) {
         Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log));
-        finished = new Path(finished, "finished");
+        finished = SortedLogState.getFinishedMarkerPath(finished);
         TabletServer.log.info("Looking for " + finished);
         if (fs.exists(finished)) {
           recovery = finished.getParent();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index b152380..2bd0b47 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -24,6 +24,7 @@ import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
@@ -67,12 +68,11 @@ import org.apache.log4j.Logger;
 
 /**
  * Wrap a connection to a logger.
- * 
+ *
  */
 public class DfsLogger {
-  // Package private so that LogSorter can find this
-  static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
-  static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
+  public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
+  public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
 
   private static Logger log = Logger.getLogger(DfsLogger.class);
 
@@ -84,6 +84,26 @@ public class DfsLogger {
     }
   }
 
+  /**
+   * A well-timed tabletserver failure could result in an incomplete header written to a write-ahead log. This exception is thrown when the header cannot be
+   * read from a WAL which should only happen when the tserver dies as described.
+   */
+  public static class LogHeaderIncompleteException extends IOException {
+    private static final long serialVersionUID = 1l;
+
+    public LogHeaderIncompleteException(String msg) {
+      super(msg);
+    }
+
+    public LogHeaderIncompleteException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+
+    public LogHeaderIncompleteException(Throwable cause) {
+      super(cause);
+    }
+  }
+
   public static class DFSLoggerInputStreams {
 
     private FSDataInputStream originalInput;
@@ -229,7 +249,9 @@ public class DfsLogger {
 
   /**
    * Refernce a pre-existing log file.
-   * @param meta the cq for the "log" entry in +r/!0
+   *
+   * @param meta
+   *          the cq for the "log" entry in +r/!0
    */
   public DfsLogger(ServerResources conf, String filename, String meta) throws IOException {
     this.conf = conf;
@@ -243,75 +265,82 @@ public class DfsLogger {
 
     byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes();
     byte[] magicBuffer = new byte[magic.length];
-    input.readFully(magicBuffer);
-    if (Arrays.equals(magicBuffer, magic)) {
-      // additional parameters it needs from the underlying stream.
-      String cryptoModuleClassname = input.readUTF();
-      CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
+    try {
+      input.readFully(magicBuffer);
+      if (Arrays.equals(magicBuffer, magic)) {
+        // additional parameters it needs from the underlying stream.
+        String cryptoModuleClassname = input.readUTF();
+        CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
 
-      // Create the parameters and set the input stream into those parameters
-      CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
-      params.setEncryptedInputStream(input);
+        // Create the parameters and set the input stream into those parameters
+        CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+        params.setEncryptedInputStream(input);
 
-      // Create the plaintext input stream from the encrypted one
-      params = cryptoModule.getDecryptingInputStream(params);
+        // Create the plaintext input stream from the encrypted one
+        params = cryptoModule.getDecryptingInputStream(params);
 
-      if (params.getPlaintextInputStream() instanceof DataInputStream) {
-        decryptingInput = (DataInputStream) params.getPlaintextInputStream();
-      } else {
-        decryptingInput = new DataInputStream(params.getPlaintextInputStream());
-      }
-    } else {
-      input.seek(0);
-      byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
-      byte[] magicBufferV2 = new byte[magicV2.length];
-      input.readFully(magicBufferV2);
-
-      if (Arrays.equals(magicBufferV2, magicV2)) {
-        // Log files from 1.5 dump their options in raw to the logger files. Since we don't know the class
-        // that needs to read those files, we can make a couple of basic assumptions. Either it's going to be
-        // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
-
-        // If it's null, we won't have any parameters whatsoever. First, let's attempt to read
-        // parameters
-        Map<String,String> opts = new HashMap<String,String>();
-        int count = input.readInt();
-        for (int i = 0; i < count; i++) {
-          String key = input.readUTF();
-          String value = input.readUTF();
-          opts.put(key, value);
+        if (params.getPlaintextInputStream() instanceof DataInputStream) {
+          decryptingInput = (DataInputStream) params.getPlaintextInputStream();
+        } else {
+          decryptingInput = new DataInputStream(params.getPlaintextInputStream());
         }
+      } else {
+        input.seek(0);
+        byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
+        byte[] magicBufferV2 = new byte[magicV2.length];
+        input.readFully(magicBufferV2);
+
+        if (Arrays.equals(magicBufferV2, magicV2)) {
+          // Log files from 1.5 dump their options in raw to the logger files. Since we don't know the class
+          // that needs to read those files, we can make a couple of basic assumptions. Either it's going to be
+          // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
+
+          // If it's null, we won't have any parameters whatsoever. First, let's attempt to read
+          // parameters
+          Map<String,String> opts = new HashMap<String,String>();
+          int count = input.readInt();
+          for (int i = 0; i < count; i++) {
+            String key = input.readUTF();
+            String value = input.readUTF();
+            opts.put(key, value);
+          }
 
-        if (opts.size() == 0) {
-          // NullCryptoModule, we're done
-          decryptingInput = input;
-        } else {
+          if (opts.size() == 0) {
+            // NullCryptoModule, we're done
+            decryptingInput = input;
+          } else {
 
-          // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot.
-          org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
-              .getCryptoModule(DefaultCryptoModule.class.getName());
+            // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot.
+            org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
+                .getCryptoModule(DefaultCryptoModule.class.getName());
 
-          CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+            CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
 
-          input.seek(0);
-          input.readFully(magicBufferV2);
-          params.setEncryptedInputStream(input);
+            input.seek(0);
+            input.readFully(magicBufferV2);
+            params.setEncryptedInputStream(input);
 
-          params = cryptoModule.getDecryptingInputStream(params);
-          if (params.getPlaintextInputStream() instanceof DataInputStream) {
-            decryptingInput = (DataInputStream) params.getPlaintextInputStream();
-          } else {
-            decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+            params = cryptoModule.getDecryptingInputStream(params);
+            if (params.getPlaintextInputStream() instanceof DataInputStream) {
+              decryptingInput = (DataInputStream) params.getPlaintextInputStream();
+            } else {
+              decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+            }
           }
-        }
 
-      } else {
+        } else {
 
-        input.seek(0);
-        decryptingInput = input;
-      }
+          input.seek(0);
+          decryptingInput = input;
+        }
 
+      }
+    } catch (EOFException e) {
+      log.warn("Got EOFException trying to read WAL header information, assuming the rest of the file (" + path + ") has no data.");
+      // A TabletServer might have died before the (complete) header was written
+      throw new LogHeaderIncompleteException(e);
     }
+
     return new DFSLoggerInputStreams(input, decryptingInput);
   }
 
@@ -438,7 +467,7 @@ public class DfsLogger {
     }
 
     // wait for background thread to finish before closing log file
-    if(syncThread != null){
+    if (syncThread != null) {
       try {
         syncThread.join();
       } catch (InterruptedException e) {
@@ -446,12 +475,12 @@ public class DfsLogger {
       }
     }
 
-    //expect workq should be empty at this point
-    if(workQueue.size() != 0){
+    // expect workq should be empty at this point
+    if (workQueue.size() != 0) {
       log.error("WAL work queue not empty after sync thread exited");
       throw new IllegalStateException("WAL work queue not empty after sync thread exited");
     }
-    
+
     if (encryptingLogFile != null)
       try {
         logFile.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index bb8e3c7..6095f88 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -37,9 +37,11 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
+import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -50,7 +52,7 @@ import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 
 /**
- * 
+ *
  */
 public class LogSorter {
 
@@ -110,7 +112,19 @@ public class LogSorter {
         // the following call does not throw an exception if the file/dir does not exist
         fs.deleteRecursively(new Path(destPath));
 
-        DFSLoggerInputStreams inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf);
+        DFSLoggerInputStreams inputStreams;
+        try {
+          inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf);
+        } catch (LogHeaderIncompleteException e) {
+          log.warn("Could not read header from write-ahead log " + srcPath + ". Not sorting.");
+          // Creating a 'finished' marker will cause recovery to proceed normally and the
+          // empty file will be correctly ignored downstream.
+          fs.mkdirs(new Path(destPath));
+          writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> emptyList(), part++);
+          fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+          return;
+        }
+
         this.input = inputStreams.getOriginalInput();
         this.decryptingInput = inputStreams.getDecryptingInputStream();
 
@@ -140,7 +154,7 @@ public class LogSorter {
         try {
           // parent dir may not exist
           fs.mkdirs(new Path(destPath));
-          fs.create(new Path(destPath, "failed")).close();
+          fs.create(SortedLogState.getFailedMarkerPath(destPath)).close();
         } catch (IOException e) {
           log.error("Error creating failed flag file " + name, e);
         }
@@ -158,10 +172,10 @@ public class LogSorter {
       }
     }
 
-    private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    private void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
       Path path = new Path(destPath, String.format("part-r-%05d", part++));
       FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
-      
+
       @SuppressWarnings("deprecation")
       MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path.toString(), LogFileKey.class, LogFileValue.class);
       try {
@@ -180,10 +194,14 @@ public class LogSorter {
     }
 
     synchronized void close() throws IOException {
-      bytesCopied = input.getPos();
-      input.close();
-      decryptingInput.close();
-      input = null;
+      // If we receive an empty or malformed-header WAL, we won't
+      // have input streams that need closing. Avoid the NPE.
+      if (null != input) {
+        bytesCopied = input.getPos();
+        input.close();
+        decryptingInput.close();
+        input = null;
+      }
     }
 
     public synchronized long getSortTime() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
index 541f075..c2a0683 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
@@ -20,6 +20,7 @@ import java.io.EOFException;
 import java.io.IOException;
 
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.commons.collections.buffer.PriorityBuffer;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,23 +33,23 @@ import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Provide simple Map.Reader methods over multiple Maps.
- * 
+ *
  * Presently only supports next() and seek() and works on all the Map directories within a directory. The primary purpose of this class is to merge the results
  * of multiple Reduce jobs that result in Map output files.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class MultiReader {
-  
+
   /**
    * Group together the next key/value from a Reader with the Reader
-   * 
+   *
    */
   private static class Index implements Comparable<Index> {
     Reader reader;
     WritableComparable key;
     Writable value;
     boolean cached = false;
-    
+
     private static Object create(java.lang.Class<?> klass) {
       try {
         return klass.getConstructor().newInstance();
@@ -56,19 +57,20 @@ public class MultiReader {
         throw new RuntimeException("Unable to construct objects to use for comparison");
       }
     }
-    
+
     public Index(Reader reader) {
       this.reader = reader;
       key = (WritableComparable) create(reader.getKeyClass());
       value = (Writable) create(reader.getValueClass());
     }
-    
+
     private void cache() throws IOException {
       if (!cached && reader.next(key, value)) {
         cached = true;
       }
     }
-    
+
+    @Override
     public int compareTo(Index o) {
       try {
         cache();
@@ -84,16 +86,16 @@ public class MultiReader {
       }
     }
   }
-  
+
   private PriorityBuffer heap = new PriorityBuffer();
-  
+
   @SuppressWarnings("deprecation")
   public MultiReader(VolumeManager fs, Path directory) throws IOException {
     boolean foundFinish = false;
     for (FileStatus child : fs.listStatus(directory)) {
       if (child.getPath().getName().startsWith("_"))
         continue;
-      if (child.getPath().getName().equals("finished")) {
+      if (SortedLogState.isFinished(child.getPath().getName())) {
         foundFinish = true;
         continue;
       }
@@ -101,9 +103,9 @@ public class MultiReader {
       heap.add(new Index(new Reader(ns, child.getPath().toString(), ns.getConf())));
     }
     if (!foundFinish)
-      throw new IOException("Sort \"finished\" flag not found in " + directory);
+      throw new IOException("Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not found in " + directory);
   }
-  
+
   private static void copy(Writable src, Writable dest) throws IOException {
     // not exactly efficient...
     DataOutputBuffer output = new DataOutputBuffer();
@@ -112,7 +114,7 @@ public class MultiReader {
     input.reset(output.getData(), output.getLength());
     dest.readFields(input);
   }
-  
+
   public synchronized boolean next(WritableComparable key, Writable val) throws IOException {
     Index elt = (Index) heap.remove();
     try {
@@ -129,7 +131,7 @@ public class MultiReader {
     }
     return true;
   }
-  
+
   public synchronized boolean seek(WritableComparable key) throws IOException {
     PriorityBuffer reheap = new PriorityBuffer(heap.size());
     boolean result = false;
@@ -149,7 +151,7 @@ public class MultiReader {
     heap = reheap;
     return result;
   }
-  
+
   public void close() throws IOException {
     IOException problem = null;
     for (Object obj : heap) {
@@ -164,5 +166,5 @@ public class MultiReader {
       throw problem;
     heap = null;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index a1229e7..c3f4fd0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -35,14 +35,17 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
+import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.log.MultiReader;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 
 public class LogReader {
+  private static final Logger log = Logger.getLogger(LogReader.class);
 
   static class Opts extends Help {
     @Parameter(names = "-r", description = "print only mutations associated with the given row")
@@ -59,7 +62,7 @@ public class LogReader {
 
   /**
    * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
-   * 
+   *
    * @param args
    *          - first argument is the file to print
    */
@@ -96,7 +99,13 @@ public class LogReader {
 
       if (fs.isFile(path)) {
         // read log entries from a simple hdfs file
-        DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration());
+        DFSLoggerInputStreams streams;
+        try {
+          streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration());
+        } catch (LogHeaderIncompleteException e) {
+          log.warn("Could not read header for " + path + ". Ignoring...");
+          continue;
+        }
         DataInputStream input = streams.getDecryptingInputStream();
 
         try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index fffa15e..03361d1 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -40,6 +40,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
@@ -132,7 +133,7 @@ public class SortedLogRecoveryTest {
           map.append(lfe.key, lfe.value);
         }
         map.close();
-        ns.create(new Path(path, "finished")).close();
+        ns.create(SortedLogState.getFinishedMarkerPath(path)).close();
         dirs.add(new Path(path));
       }
       // Recover

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
index d6c23e3..1f2ce6a 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.Path;
@@ -66,7 +67,7 @@ public class TestUpgradePathForWALogs {
     fs = VolumeManagerImpl.getLocal(path);
     Path manyMapsPath = new Path("file://" + path);
     fs.mkdirs(manyMapsPath);
-    fs.create(new Path(manyMapsPath, "finished")).close();
+    fs.create(SortedLogState.getFinishedMarkerPath(manyMapsPath)).close();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
new file mode 100644
index 0000000..7f2f6f9
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+
+/**
+ *
+ */
+public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
+  private static final Logger log = LoggerFactory.getLogger(MissingWalHeaderCompletesRecoveryIT.class);
+
+  private static boolean rootHasWritePermission;
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration conf) {
+    cfg.setNumTservers(1);
+    cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
+    // Make sure the GC doesn't delete the file before the metadata reference is added
+    cfg.setProperty(Property.GC_CYCLE_START, "999999s");
+    conf.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Before
+  public void setupMetadataPermission() throws Exception {
+    Connector conn = getConnector();
+    rootHasWritePermission = conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+    if (!rootHasWritePermission) {
+      conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+      // Make sure it propagates through ZK
+      Thread.sleep(5000);
+    }
+  }
+
+  @After
+  public void resetMetadataPermission() throws Exception {
+    Connector conn = getConnector();
+    // Final state doesn't match the original
+    if (rootHasWritePermission != conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE)) {
+      if (rootHasWritePermission) {
+        // root had write permission when starting, ensure root still does
+        conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+      } else {
+        // root did not have write permission when starting, ensure that it does not
+        conn.securityOperations().revokeTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+      }
+    }
+  }
+
+  @Test
+  public void testEmptyWalRecoveryCompletes() throws Exception {
+    Connector conn = getConnector();
+    MiniAccumuloClusterImpl cluster = getCluster();
+    FileSystem fs = FileSystem.get(new Configuration());
+
+    // Fake out something that looks like host:port, it's irrelevant
+    String fakeServer = "127.0.0.1:12345";
+
+    File walogs = new File(cluster.getConfig().getAccumuloDir(), ServerConstants.WAL_DIR);
+    File walogServerDir = new File(walogs, fakeServer.replace(':', '+'));
+    File emptyWalog = new File(walogServerDir, UUID.randomUUID().toString());
+
+    log.info("Created empty WAL at " + emptyWalog.toURI());
+
+    fs.create(new Path(emptyWalog.toURI())).close();
+
+    Assert.assertTrue("root user did not have write permission to metadata table",
+        conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE));
+
+    String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    Assert.assertNotNull("Table ID was null", tableId);
+
+    LogEntry logEntry = new LogEntry();
+    logEntry.server = "127.0.0.1:12345";
+    logEntry.filename = emptyWalog.toURI().toString();
+    logEntry.tabletId = 10;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+
+    log.info("Taking {} offline", tableName);
+    conn.tableOperations().offline(tableName, true);
+
+    log.info("{} is offline", tableName);
+
+    Text row = MetadataSchema.TabletsSection.getRow(new Text(tableId), null);
+    Mutation m = new Mutation(row);
+    m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("Bringing {} online", tableName);
+    conn.tableOperations().online(tableName, true);
+
+    log.info("{} is online", tableName);
+
+    // Reading the table implies that recovery completed successfully (the empty file was ignored)
+    // otherwise the tablet will never come online and we won't be able to read it.
+    Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+    Assert.assertEquals(0, Iterables.size(s));
+  }
+
+  @Test
+  public void testPartialHeaderWalRecoveryCompletes() throws Exception {
+    Connector conn = getConnector();
+    MiniAccumuloClusterImpl cluster = getCluster();
+    FileSystem fs = FileSystem.get(new Configuration());
+
+    // Fake out something that looks like host:port, it's irrelevant
+    String fakeServer = "127.0.0.1:12345";
+
+    File walogs = new File(cluster.getConfig().getAccumuloDir(), ServerConstants.WAL_DIR);
+    File walogServerDir = new File(walogs, fakeServer.replace(':', '+'));
+    File partialHeaderWalog = new File(walogServerDir, UUID.randomUUID().toString());
+
+    log.info("Created WAL with malformed header at " + partialHeaderWalog.toURI());
+
+    // Write half of the header
+    FSDataOutputStream wal = fs.create(new Path(partialHeaderWalog.toURI()));
+    wal.write(DfsLogger.LOG_FILE_HEADER_V3.getBytes(), 0, DfsLogger.LOG_FILE_HEADER_V3.length() / 2);
+    wal.close();
+
+    Assert.assertTrue("root user did not have write permission to metadata table",
+        conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE));
+
+    String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    Assert.assertNotNull("Table ID was null", tableId);
+
+    LogEntry logEntry = new LogEntry();
+    logEntry.server = "127.0.0.1:12345";
+    logEntry.filename = partialHeaderWalog.toURI().toString();
+    logEntry.tabletId = 10;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+
+    log.info("Taking {} offline", tableName);
+    conn.tableOperations().offline(tableName, true);
+
+    log.info("{} is offline", tableName);
+
+    Text row = MetadataSchema.TabletsSection.getRow(new Text(tableId), null);
+    Mutation m = new Mutation(row);
+    m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("Bringing {} online", tableName);
+    conn.tableOperations().online(tableName, true);
+
+    log.info("{} is online", tableName);
+
+    // Reading the table implies that recovery completed successfully (the empty file was ignored)
+    // otherwise the tablet will never come online and we won't be able to read it.
+    Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+    Assert.assertEquals(0, Iterables.size(s));
+  }
+
+}


[4/4] git commit: Merge branch '1.6' of https://git-wip-us.apache.org/repos/asf/accumulo into 1.6

Posted by el...@apache.org.
Merge branch '1.6' of https://git-wip-us.apache.org/repos/asf/accumulo into 1.6


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a65f1234
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a65f1234
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a65f1234

Branch: refs/heads/1.6
Commit: a65f1234e35c9ef89d8e7eb89f40acc1bc943563
Parents: d65e0e3 65acebb
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 2 19:19:11 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 2 19:19:11 2014 -0400

----------------------------------------------------------------------
 .../test/functional/BulkSplitOptimizationIT.java         | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------