You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/10/23 20:58:18 UTC

[1/9] git commit: ACCUMULO-3249 Remove an unnecessary utility method for clarity and reduce visibility on some internal methods

Repository: accumulo
Updated Branches:
  refs/heads/master 7aca0326e -> eaaebdf33


ACCUMULO-3249 Remove an unnecessary utility method for clarity and reduce visibility on some internal methods

A number of the methods in ReplicationTableUtil shouldn't be directly accessed
by clients but were still public or protected. Make things a little more specific
by making them package private instead (to still allow for tests to work).


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ebc4a041
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ebc4a041
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ebc4a041

Branch: refs/heads/master
Commit: ebc4a0411bc523343a0b38fe75d221f28cd1e5c6
Parents: d75be63
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 22 13:22:13 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 23 11:18:33 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/server/fs/VolumeUtil.java    |  4 +++-
 .../accumulo/server/util/ReplicationTableUtil.java   | 15 ++++-----------
 2 files changed, 7 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebc4a041/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 82b77ee..13f1e83 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -241,7 +241,9 @@ public class VolumeUtil {
         Status status = StatusUtil.fileClosed();
         log.debug("Tablet directory switched, need to record old log files " + logsToRemove + " " + ProtobufUtil.toString(status));
         // Before deleting these logs, we need to mark them for replication
-        ReplicationTableUtil.updateLogs(creds, extent, logsToRemove, status);
+        for (LogEntry logEntry : logsToRemove) {
+          ReplicationTableUtil.updateFiles(creds, extent, logEntry.logSet, status);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebc4a041/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index ab5ee86..8f0656c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -45,7 +45,6 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.StatusFormatter;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -78,11 +77,11 @@ public class ReplicationTableUtil {
    * @param writer
    *          A Writer to use for the given credentials
    */
-  protected synchronized static void addWriter(Credentials creds, Writer writer) {
+  synchronized static void addWriter(Credentials creds, Writer writer) {
     writers.put(creds, writer);
   }
 
-  protected synchronized static Writer getWriter(Credentials credentials) {
+  synchronized static Writer getWriter(Credentials credentials) {
     Writer replicationTable = writers.get(credentials);
     if (replicationTable == null) {
       Instance inst = HdfsZooInstance.getInstance();
@@ -157,7 +156,7 @@ public class ReplicationTableUtil {
   /**
    * Write the given Mutation to the replication table.
    */
-  protected static void update(Credentials credentials, Mutation m, KeyExtent extent) {
+  static void update(Credentials credentials, Mutation m, KeyExtent extent) {
     Writer t = getWriter(credentials);
     while (true) {
       try {
@@ -176,12 +175,6 @@ public class ReplicationTableUtil {
     }
   }
 
-  public static void updateLogs(Credentials creds, KeyExtent extent, Collection<LogEntry> logs, Status stat) {
-    for (LogEntry entry : logs) {
-      updateFiles(creds, extent, entry.logSet, stat);
-    }
-  }
-
   /**
    * Write replication ingest entries for each provided file with the given {@link Status}.
    */
@@ -201,7 +194,7 @@ public class ReplicationTableUtil {
     }
   }
 
-  public static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) {
+  static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) {
     // Need to normalize the file path so we can assuredly find it again later
     return createUpdateMutation(new Text(ReplicationSection.getRowPrefix() + file.toString()), v, extent);
   }


[3/9] git commit: ACCUMULO-3249 Some extra logging that's missing to get a clear picture.

Posted by el...@apache.org.
ACCUMULO-3249 Some extra logging that's missing to get a clear picture.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/24921c72
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/24921c72
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/24921c72

Branch: refs/heads/master
Commit: 24921c72745a2908202ca13c084d1ec46aba4950
Parents: 7aca032
Author: Josh Elser <el...@apache.org>
Authored: Tue Oct 21 16:57:49 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 23 11:18:33 2014 -0400

----------------------------------------------------------------------
 .../accumulo/master/replication/FinishedWorkUpdater.java       | 4 +++-
 .../master/replication/RemoveCompleteReplicationRecords.java   | 6 +++---
 .../accumulo/master/replication/UnorderedWorkAssigner.java     | 4 +++-
 3 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/24921c72/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
index 8048e02..b816eab 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
@@ -122,7 +122,7 @@ public class FinishedWorkUpdater implements Runnable {
 
           // Find the minimum value for begin (everyone has replicated up to this offset in the file)
           tableIdToProgress.put(target.getSourceTableId(), Math.min(tableIdToProgress.get(target.getSourceTableId()), status.getBegin()));
-        } 
+        }
 
         if (error) {
           continue;
@@ -151,6 +151,8 @@ public class FinishedWorkUpdater implements Runnable {
           // Make the mutation
           StatusSection.add(replMutation, buffer, serializedUpdatedStatus);
 
+          log.debug("Updating replication status entry for {} with {}", serializedRow.getKey().getRow(), updatedStatus);
+
           try {
             replBw.addMutation(replMutation);
           } catch (MutationsRejectedException e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/24921c72/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index ecf0c9e..c670d51 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@ -106,7 +106,7 @@ public class RemoveCompleteReplicationRecords implements Runnable {
   /**
    * Removes {@link Status} records read from the given {@code bs} and writes a delete, using the given {@code bw}, when that {@link Status} is fully replicated
    * and closed, as defined by {@link StatusUtil#isSafeForRemoval(Status)}.
-   * 
+   *
    * @param conn
    *          A Connector
    * @param bs
@@ -164,6 +164,8 @@ public class RemoveCompleteReplicationRecords implements Runnable {
       k.getColumnFamily(colf);
       k.getColumnQualifier(colq);
 
+      log.debug("Removing {} {}:{} from replication table", row, colf, colq);
+
       m.putDelete(colf, colq);
 
       String tableId;
@@ -188,8 +190,6 @@ public class RemoveCompleteReplicationRecords implements Runnable {
       recordsRemoved++;
     }
 
-    log.info("Removing {} from the replication table", row);
-
     List<Mutation> mutations = new ArrayList<>();
     mutations.add(m);
     for (Entry<String,Long> entry : tableToTimeCreated.entrySet()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/24921c72/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
index 9042e2d..9a28dd4 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
@@ -70,6 +70,7 @@ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner {
   /**
    * Initialize the queuedWork set with the work already sent out
    */
+  @Override
   protected void initializeQueuedWork() {
     if (null != queuedWork) {
       return;
@@ -98,7 +99,7 @@ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner {
 
   /**
    * Distribute the work for the given path with filename
-   * 
+   *
    * @param path
    *          Path to the file being replicated
    * @param target
@@ -108,6 +109,7 @@ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner {
   protected boolean queueWork(Path path, ReplicationTarget target) {
     String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.getName(), target);
     if (queuedWork.contains(queueKey)) {
+      log.debug("{} is already queued to be replicated to {}, not re-queueing", path, target);
       return false;
     }
 


[9/9] git commit: ACCUMULO-3249 Didn't wait long enough to check for expected condition

Posted by el...@apache.org.
ACCUMULO-3249 Didn't wait long enough to check for expected condition


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/eaaebdf3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/eaaebdf3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/eaaebdf3

Branch: refs/heads/master
Commit: eaaebdf33b7d26a0977d2df252851639726abcf0
Parents: 6512bfd
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 23 12:26:15 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 23 12:26:15 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/test/replication/ReplicationIT.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/eaaebdf3/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index e8312df..fb96d7f 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -1199,6 +1199,8 @@ public class ReplicationIT extends ConfigurableMacIT {
         if (allClosed) {
           break;
         }
+
+        UtilWaitThread.sleep(2000);
       }
 
       if (!allClosed) {
@@ -1233,7 +1235,7 @@ public class ReplicationIT extends ConfigurableMacIT {
           break;
         }
 
-        UtilWaitThread.sleep(1000);
+        UtilWaitThread.sleep(2000);
       }
 
       if (!allClosed) {


[4/9] git commit: ACCUMULO-3249 New IT which ensures that closed status msgs aren't written for wals with no records for the tablet

Posted by el...@apache.org.
ACCUMULO-3249 New IT which ensures that closed status msgs aren't written for wals with no records for the tablet


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6512bfd5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6512bfd5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6512bfd5

Branch: refs/heads/master
Commit: 6512bfd5ef9d8c3bbb314ddcdc8d3af4bcd6f9dd
Parents: 6cbc586
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 22 19:32:24 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 23 11:18:59 2014 -0400

----------------------------------------------------------------------
 .../UnusedWalDoesntCloseReplicationStatus.java  | 223 +++++++++++++++++++
 1 file changed, 223 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6512bfd5/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatus.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatus.java b/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatus.java
new file mode 100644
index 0000000..5590f7b
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatus.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+/**
+ *
+ */
+public class UnusedWalDoesntCloseReplicationStatus extends ConfigurableMacIT {
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+    cfg.setNumTservers(1);
+  }
+
+  @Test
+  public void test() throws Exception {
+    File accumuloDir = this.getCluster().getConfig().getAccumuloDir();
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+
+    conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+    conn.tableOperations().create(tableName);
+
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    final int numericTableId = Integer.valueOf(tableId);
+    final int fakeTableId = numericTableId + 1;
+
+    Assert.assertNotNull("Did not find table ID", tableId);
+
+    conn.tableOperations().setProperty(tableName, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(tableName, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    // just sleep
+    conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+        ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000"));
+
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    File tserverWalDir = new File(accumuloDir, ServerConstants.WAL_DIR + Path.SEPARATOR + "faketserver+port");
+    File tserverWal = new File(tserverWalDir, UUID.randomUUID().toString());
+    fs.mkdirs(new Path(tserverWalDir.getAbsolutePath()));
+
+    // Make a fake WAL with no data in it for our real table
+    FSDataOutputStream out = fs.create(new Path(tserverWal.getAbsolutePath()));
+
+    out.write(DfsLogger.LOG_FILE_HEADER_V3.getBytes(StandardCharsets.UTF_8));
+
+    DataOutputStream dos = new DataOutputStream(out);
+    dos.writeUTF("NullCryptoModule");
+
+    // Fake a single update WAL that has a mutation for another table
+    LogFileKey key = new LogFileKey();
+    LogFileValue value = new LogFileValue();
+
+    key.event = OPEN;
+    key.tserverSession = tserverWal.getAbsolutePath();
+    key.filename = tserverWal.getAbsolutePath();
+    key.write(out);
+    value.write(out);
+
+    key.event = LogEvents.DEFINE_TABLET;
+    key.tablet = new KeyExtent(new Text(Integer.toString(fakeTableId)), null, null);
+    key.seq = 1l;
+    key.tid = 1;
+
+    key.write(dos);
+    value.write(dos);
+
+    key.tablet = null;
+    key.event = LogEvents.MUTATION;
+    key.filename = tserverWal.getAbsolutePath();
+    value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row")));
+
+    key.write(dos);
+    value.write(dos);
+
+    key.event = LogEvents.COMPACTION_START;
+    key.filename = accumuloDir.getAbsolutePath() + "/tables/" + fakeTableId + "/t-000001/A000001.rf";
+    value.mutations = Collections.emptyList();
+
+    key.write(dos);
+    value.write(dos);
+
+    key.event = LogEvents.COMPACTION_FINISH;
+    value.mutations = Collections.emptyList();
+
+    key.write(dos);
+    value.write(dos);
+
+    dos.close();
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("m");
+    m.put("m", "m", "M");
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("State of metadata table after inserting a record");
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    for (Entry<Key,Value> entry : s) {
+      System.out.println(entry.getKey().toStringNoTruncate() + " " + entry.getValue());
+    }
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.ReplicationSection.getRange());
+    for (Entry<Key,Value> entry : s) {
+      System.out.println(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+    }
+
+    log.info("Offline'ing table");
+
+    conn.tableOperations().offline(tableName, true);
+
+    // Add our fake WAL to the log column for this table
+    String walUri = tserverWal.toURI().toString();
+    KeyExtent extent = new KeyExtent(new Text(tableId), null, null);
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(extent.getMetadataEntry());
+    m.put(MetadataSchema.TabletsSection.LogColumnFamily.NAME, new Text("localhost:12345/" + walUri),
+        new Value((walUri + "|1").getBytes(StandardCharsets.UTF_8)));
+    bw.addMutation(m);
+
+    // Add a replication entry for our fake WAL
+    m = new Mutation(MetadataSchema.ReplicationSection.getRowPrefix() + new Path(walUri).toString());
+    m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId), new Value(StatusUtil.fileCreated(System.currentTimeMillis()).toByteArray()));
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("State of metadata after injecting WAL manually");
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    for (Entry<Key,Value> entry : s) {
+      log.info(entry.getKey().toStringNoTruncate() + " " + entry.getValue());
+    }
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.ReplicationSection.getRange());
+    for (Entry<Key,Value> entry : s) {
+      log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+    }
+
+    log.info("Bringing table online");
+    conn.tableOperations().online(tableName, true);
+
+    Assert.assertEquals(1, Iterables.size(conn.createScanner(tableName, Authorizations.EMPTY)));
+
+    log.info("Table has performed recovery, state of metadata:");
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    for (Entry<Key,Value> entry : s) {
+      log.info(entry.getKey().toStringNoTruncate() + " " + entry.getValue());
+    }
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.ReplicationSection.getRange());
+    for (Entry<Key,Value> entry : s) {
+      Status status = Status.parseFrom(entry.getValue().get());
+      log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(status));
+      Assert.assertFalse("Status record was closed and it should not be", status.getClosed());
+    }
+  }
+}


[7/9] git commit: ACCUMULO-3249 Extra comments about issuing updates to replication status for WALs.

Posted by el...@apache.org.
ACCUMULO-3249 Extra comments about issuing updates to replication status for WALs.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6a713aa4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6a713aa4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6a713aa4

Branch: refs/heads/master
Commit: 6a713aa4282ddfd658de7af05a801f5791cd4eec
Parents: ceb131c
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 22 14:53:19 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 23 11:18:59 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/tablet/DatafileManager.java   |  3 ++-
 .../java/org/apache/accumulo/tserver/tablet/Tablet.java   | 10 +++++++++-
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6a713aa4/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index a84f092..5cdc1af 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -422,7 +422,8 @@ class DatafileManager {
 
       // Mark that we have data we want to replicate
       // This WAL could still be in use by other Tablets *from the same table*, so we can only mark that there is data to replicate,
-      // but it is *not* closed
+      // but it is *not* closed. We know it is not closed by the fact that this MinC triggered. A MinC cannot happen unless the
+      // tablet is online and thus these WALs are referenced by that tablet. Therefore, the WAL replication status cannot be 'closed'.
       if (replicate) {
         if (log.isDebugEnabled()) {
           log.debug("Recording that data has been ingested into " + tablet.getExtent() + " using " + logFileOnly);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6a713aa4/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 8ba72b7..d82827e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -621,11 +621,19 @@ public class Tablet implements TabletCommitter {
           log.debug("No replayed mutations applied, removing unused entries for " + extent);
           MetadataTableUtil.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock());
 
+          // No replication update to be made because the fact that this tablet didn't use any mutations
+          // from the WAL implies nothing about use of this WAL by other tablets. Do nothing.
+
           logEntries.clear();
         } else if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
-          // The logs are about to be re-used, we need to record that they have data for this extent,
+          // The logs are about to be re-used by this tablet, we need to record that they have data for this extent,
           // but that they may get more data. logEntries is not cleared which will cause the elements
           // in logEntries to be added to the currentLogs for this Tablet below.
+          //
+          // This update serves the same purpose as an update during a MinC. We know that the WAL was defined
+          // (written when the WAL was opened) but this lets us know there are mutations written to this WAL
+          // that could potentially be replicated. Because the Tablet is using this WAL, we can be sure that
+          // the WAL isn't closed (WRT replication Status) and thus we're safe to update its progress.
           Status status = StatusUtil.openWithUnknownLength();
           for (LogEntry logEntry : logEntries) {
             log.debug("Writing updated status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));


[6/9] git commit: ACCUMULO-3249 Create work only when there is work to be done for the status.

Posted by el...@apache.org.
ACCUMULO-3249 Create work only when there is work to be done for the status.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6cbc5863
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6cbc5863
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6cbc5863

Branch: refs/heads/master
Commit: 6cbc58630d3507cadf641d8c2309ee15452aa4db
Parents: 48f94bd
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 22 15:05:17 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 23 11:18:59 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/master/replication/WorkMaker.java  | 5 ++++-
 .../org/apache/accumulo/master/replication/WorkMakerTest.java   | 5 +++--
 2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6cbc5863/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index 0eefca3..eabcc84 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -103,7 +103,7 @@ public class WorkMaker {
           continue;
         }
 
-        // Don't create the record if we have nothing to do
+        // Don't create the record if we have nothing to do.
         // TODO put this into a filter on serverside
         if (!shouldCreateWork(status)) {
           continue;
@@ -155,6 +155,9 @@ public class WorkMaker {
    * @return Should a Work entry be created for this status
    */
   protected boolean shouldCreateWork(Status status) {
+    // Only creating work when there is work to do (regardless of closed status) is safe
+    // as long as the ReplicaSystem implementation is correctly observing
+    // that a file is completely replicated only when the file is closed
     return StatusUtil.isWorkRequired(status);
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6cbc5863/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
index 9bc3102..d486ded 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
@@ -50,7 +50,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
 /**
- * 
+ *
  */
 public class WorkMakerTest {
 
@@ -207,6 +207,7 @@ public class WorkMakerTest {
 
     Assert.assertFalse(workMaker.shouldCreateWork(StatusUtil.fileCreated(System.currentTimeMillis())));
     Assert.assertTrue(workMaker.shouldCreateWork(StatusUtil.ingestedUntil(1000)));
-    Assert.assertTrue(workMaker.shouldCreateWork(Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(true).build()));
+    // We don't need to re-create work for something that's already replicated.
+    Assert.assertFalse(workMaker.shouldCreateWork(Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(true).build()));
   }
 }


[2/9] git commit: ACCUMULO-3249 Clarify some comments

Posted by el...@apache.org.
ACCUMULO-3249 Clarify some comments


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d75be63c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d75be63c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d75be63c

Branch: refs/heads/master
Commit: d75be63cf3e11e4d73289ed2adcf105d2bfc23fc
Parents: 24921c7
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 22 13:14:57 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 23 11:18:33 2014 -0400

----------------------------------------------------------------------
 .../src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d75be63c/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index caff246..85af110 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -633,7 +633,8 @@ public class Tablet implements TabletCommitter {
           logEntries.clear();
         } else if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
           // The logs are about to be re-used, we need to record that they have data for this extent,
-          // but that they may get more data
+          // but that they may get more data. logEntries is not cleared which will cause the elements
+          // in logEntries to be added to the currentLogs for this Tablet below.
           Status status = StatusUtil.openWithUnknownLength();
           for (LogEntry logEntry : logEntries) {
             log.debug("Writing updated status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));


[5/9] git commit: ACCUMULO-3249 When recovery of a tablet doesn't recover anything from a WAL, we must ignore it.

Posted by el...@apache.org.
ACCUMULO-3249 When recovery of a tablet doesn't recover anything from a WAL, we must ignore it.

Marking a WAL as closed when recovery doesn't use any mutations from the WAL
is an incorrect assertion. Even though the current Tablet didn't find any
mutations from this WAL doesn't mean that other Tablets also won't find
any mutations. Even worse, other Tablets could *continue to use* this
WAL for new data.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ceb131cb
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ceb131cb
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ceb131cb

Branch: refs/heads/master
Commit: ceb131cb0702dddd01a62de5d4d1fea40b9cb172
Parents: ebc4a04
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 22 14:22:15 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 23 11:18:59 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/tserver/tablet/Tablet.java     | 9 ---------
 1 file changed, 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ceb131cb/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 85af110..8ba72b7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -621,15 +621,6 @@ public class Tablet implements TabletCommitter {
           log.debug("No replayed mutations applied, removing unused entries for " + extent);
           MetadataTableUtil.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock());
 
-          // Ensure that we write a record marking each WAL as requiring replication to make sure we don't abandon the data
-          if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
-            Status status = StatusUtil.fileClosed();
-            for (LogEntry logEntry : logEntries) {
-              log.debug("Writing closed status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
-              ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logEntry.logSet, status);
-            }
-          }
-
           logEntries.clear();
         } else if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
           // The logs are about to be re-used, we need to record that they have data for this extent,


[8/9] git commit: ACCUMULO-3249 Work should be created regardless of open or closed

Posted by el...@apache.org.
ACCUMULO-3249 Work should be created regardless of open or closed


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/48f94bd2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/48f94bd2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/48f94bd2

Branch: refs/heads/master
Commit: 48f94bd2905e04b8df0051d57d019684473a3461
Parents: 6a713aa
Author: Josh Elser <el...@apache.org>
Authored: Wed Oct 22 15:01:49 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 23 11:18:59 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/master/replication/WorkMaker.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/48f94bd2/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index da17bba..0eefca3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -155,7 +155,7 @@ public class WorkMaker {
    * @return Should a Work entry be created for this status
    */
   protected boolean shouldCreateWork(Status status) {
-    return status.getClosed() || StatusUtil.isWorkRequired(status);
+    return StatusUtil.isWorkRequired(status);
   }
 
   protected void addWorkRecord(Text file, Value v, Map<String,String> targets, String sourceTableId) {