You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/28 20:56:31 UTC

[1/4] git commit: ACCUMULO-378 Jenkins fix -- retry if we get a security exception because the grant on the replication table didn't happen yet

Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-378 070ceb1da -> 1f0ee9c50


ACCUMULO-378 Jenkins fix -- retry if we get a security exception because the grant on the replication table didn't happen yet


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/03d57520
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/03d57520
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/03d57520

Branch: refs/heads/ACCUMULO-378
Commit: 03d57520a9668fb0d82b62708096ce6b595b0cdc
Parents: 070ceb1
Author: Josh Elser <el...@apache.org>
Authored: Wed May 28 11:31:34 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 28 11:31:34 2014 -0400

----------------------------------------------------------------------
 .../test/replication/ReplicationTest.java       | 85 ++++++++++++--------
 1 file changed, 50 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/03d57520/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
index b59f8da..51e4e46 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
@@ -680,30 +680,44 @@ public class ReplicationTest extends ConfigurableMacIT {
         log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue());
       }
 
-      s = ReplicationTable.getScanner(conn);
-      StatusSection.limit(s);
-      Text buff = new Text();
-      boolean allReferencedLogsClosed = true;
-      int recordsFound = 0;
-      for (Entry<Key,Value> e : s) {
-        recordsFound++;
-        allReferencedLogsClosed = true;
-        StatusSection.getFile(e.getKey(), buff);
-        String file = buff.toString();
-        if (wals.contains(file)) {
-          Status stat = Status.parseFrom(e.getValue().get());
-          if (!stat.getClosed()) {
-            log.info("{} wasn't closed", file);
-            allReferencedLogsClosed = false;
+      try {
+        s = ReplicationTable.getScanner(conn);
+        StatusSection.limit(s);
+        Text buff = new Text();
+        boolean allReferencedLogsClosed = true;
+        int recordsFound = 0;
+        for (Entry<Key,Value> e : s) {
+          recordsFound++;
+          allReferencedLogsClosed = true;
+          StatusSection.getFile(e.getKey(), buff);
+          String file = buff.toString();
+          if (wals.contains(file)) {
+            Status stat = Status.parseFrom(e.getValue().get());
+            if (!stat.getClosed()) {
+              log.info("{} wasn't closed", file);
+              allReferencedLogsClosed = false;
+            }
           }
         }
-      }
 
-      if (recordsFound > 0 && allReferencedLogsClosed) {
-        return;
+        if (recordsFound > 0 && allReferencedLogsClosed) {
+          return;
+        }
+        Thread.sleep(1000);
+      } catch (RuntimeException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof AccumuloSecurityException) {
+          AccumuloSecurityException ase = (AccumuloSecurityException) cause;
+          switch (ase.getSecurityErrorCode()) {
+            case PERMISSION_DENIED:
+              // We tried to read the replication table before the GRANT went through
+              Thread.sleep(1000);
+              break;
+            default:
+              throw e;
+          }
+        }
       }
-
-      Thread.sleep(1000);
     }
 
     Assert.fail("We had a file that was referenced but didn't get closed");
@@ -817,7 +831,8 @@ public class ReplicationTest extends ConfigurableMacIT {
 
     Assert.assertNotNull("Could not find expected entry in replication table", entry);
     Status actual = Status.parseFrom(entry.getValue().get());
-    Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual), !actual.getClosed() && actual.getInfiniteEnd());
+    Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual),
+        !actual.getClosed() && actual.getInfiniteEnd());
 
     // Try a couple of times to watch for the work record to be created
     boolean notFound = true;
@@ -1011,7 +1026,7 @@ public class ReplicationTest extends ConfigurableMacIT {
     });
 
     t.start();
-    
+
     String table1 = "table1", table2 = "table2", table3 = "table3";
 
     BatchWriter bw;
@@ -1021,7 +1036,7 @@ public class ReplicationTest extends ConfigurableMacIT {
       conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
       conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
           ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
-  
+
       // Write some data to table1
       bw = conn.createBatchWriter(table1, new BatchWriterConfig());
       for (int rows = 0; rows < 200; rows++) {
@@ -1032,13 +1047,13 @@ public class ReplicationTest extends ConfigurableMacIT {
         }
         bw.addMutation(m);
       }
-  
+
       bw.close();
-  
+
       conn.tableOperations().create(table2);
       conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
       conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-  
+
       // Write some data to table2
       bw = conn.createBatchWriter(table2, new BatchWriterConfig());
       for (int rows = 0; rows < 200; rows++) {
@@ -1049,13 +1064,13 @@ public class ReplicationTest extends ConfigurableMacIT {
         }
         bw.addMutation(m);
       }
-  
+
       bw.close();
-  
+
       conn.tableOperations().create(table3);
       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-  
+
       // Write some data to table3
       bw = conn.createBatchWriter(table3, new BatchWriterConfig());
       for (int rows = 0; rows < 200; rows++) {
@@ -1066,9 +1081,9 @@ public class ReplicationTest extends ConfigurableMacIT {
         }
         bw.addMutation(m);
       }
-  
+
       bw.close();
-  
+
       // Flush everything to try to make the replication records
       for (String table : Arrays.asList(table1, table2, table3)) {
         conn.tableOperations().compact(table, null, null, true, true);
@@ -1191,8 +1206,8 @@ public class ReplicationTest extends ConfigurableMacIT {
     // replication shouldn't exist when we begin
     Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
 
-//    ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
-//    thread.start();
+    // ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
+    // thread.start();
 
     try {
       // Create two tables
@@ -1395,8 +1410,8 @@ public class ReplicationTest extends ConfigurableMacIT {
 
       Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
     } finally {
-//      thread.interrupt();
-//      thread.join(5000);
+      // thread.interrupt();
+      // thread.join(5000);
     }
   }
 }


[3/4] git commit: ACCUMULO-378 Try to reduce the spam in the logs when the peer is unavailable.

Posted by el...@apache.org.
ACCUMULO-378 Try to reduce the spam in the logs when the peer is unavailable.

Increase the sleep time at a linear scale when we fail to talk to the replication
coordinator. Eventually, bail out completely and let the replication process
take over again.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/27905426
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/27905426
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/27905426

Branch: refs/heads/ACCUMULO-378
Commit: 27905426f5a0a3f5c140d1ae1b0c00497e6ad0bb
Parents: 5e8d6d2
Author: Josh Elser <el...@apache.org>
Authored: Wed May 28 11:45:57 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 28 11:45:57 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/core/client/impl/ReplicationClient.java   | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/27905426/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index 02ae3d0..d7b12c7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -51,17 +51,18 @@ public class ReplicationClient {
    *          Instance for the peer replicant
    * @return Client to the ReplicationCoordinator service
    */
-  public static ReplicationCoordinator.Client getCoordinatorConnectionWithRetry(Instance instance) {
+  public static ReplicationCoordinator.Client getCoordinatorConnectionWithRetry(Instance instance) throws AccumuloException {
     checkArgument(instance != null, "instance is null");
 
-    while (true) {
+    for (int attempts = 1; attempts <= 10; attempts++) {
 
       ReplicationCoordinator.Client result = getCoordinatorConnection(instance);
       if (result != null)
         return result;
-      UtilWaitThread.sleep(250);
+      UtilWaitThread.sleep(attempts * 250);
     }
 
+    throw new AccumuloException("Timed out trying to communicate with master from " + instance.getInstanceName());
   }
 
   public static ReplicationCoordinator.Client getCoordinatorConnection(Instance instance) {


[2/4] git commit: ACCUMULO-378 Removing experimental annotation.

Posted by el...@apache.org.
ACCUMULO-378 Removing experimental annotation.

In talking to [~ctubbsii] on the subject, he stated that the intent of Experimental
was for features that are incomplete or not expected to fully work. Replication is
not one of those features (it is tested and expected to work with known functionality),
therefore I'm removing the experimental annotation.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5e8d6d2c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5e8d6d2c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5e8d6d2c

Branch: refs/heads/ACCUMULO-378
Commit: 5e8d6d2c7fdfd41a99ab812500eaf92661ba8481
Parents: 03d5752
Author: Josh Elser <el...@apache.org>
Authored: Wed May 28 11:44:49 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 28 11:44:49 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java | 26 +-------------------
 1 file changed, 1 insertion(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e8d6d2c/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 6afa956..8409781 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -179,14 +179,10 @@ public enum Property {
       "A class that implements a mechansim to steal write access to a file"),
   MASTER_FATE_THREADPOOL_SIZE("master.fate.threadpool.size", "4", PropertyType.COUNT,
       "The number of threads used to run FAult-Tolerant Executions. These are primarily table operations like merge."),
-  @Experimental
   MASTER_REPLICATION_SCAN_INTERVAL("master.replication.status.scan.interval", "30s", PropertyType.TIMEDURATION,
       "Amount of time to sleep before scanning the status section of the replication table for new data"),
-  @Experimental
   MASTER_REPLICATION_COORDINATOR_PORT("master.replication.coordinator.port", "10001", PropertyType.PORT, "Port for the replication coordinator service"),
-  @Experimental
   MASTER_REPLICATION_COORDINATOR_MINTHREADS("master.replication.coordinator.minthreads", "4", PropertyType.COUNT, "Minimum number of threads dedicated to answering coordinator requests"),
-  @Experimental
   MASTER_REPLICATION_COORDINATOR_THREADCHECK("master.replication.coordinator.threadcheck.time", "5s", PropertyType.TIMEDURATION, "The time between adjustments of the coordinator thread pool"),
 
   // properties that are specific to tablet server behavior
@@ -276,12 +272,9 @@ public enum Property {
       "The number of threads for the distributed work queue. These threads are used for copying failed bulk files."),
   TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN,
       "Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents problems recovering from sudden system resets."),
-  @Experimental
   TSERV_REPLICATION_REPLAYERS("tserver.replication.replayer.", null, PropertyType.PREFIX, "Allows configuration of implementation used to apply replicated data"),
-  @Experimental
   TSERV_REPLICATION_DEFAULT_HANDLER("tserver.replication.default.replayer", "org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer",
       PropertyType.CLASSNAME, "Default AccumuloReplicationReplayer implementation"),
-  @Experimental
   TSERV_REPLICATION_BW_REPLAYER_MEMORY("tserver.replication.batchwriter.replayer.memory", "50M", PropertyType.MEMORY, "Memory to provide to batchwriter to replay mutations for replication"),
 
   // properties that are specific to logger server behavior
@@ -432,9 +425,7 @@ public enum Property {
       "A customizable major compaction strategy."),
   TABLE_COMPACTION_STRATEGY_PREFIX("table.majc.compaction.strategy.opts.", null, PropertyType.PREFIX,
       "Properties in this category are used to configure the compaction strategy."),
-  @Experimental
   TABLE_REPLICATION("table.replication", "false", PropertyType.BOOLEAN, "Is replication enabled for the given table"),
-  @Experimental
   TABLE_REPLICATION_TARGETS("table.replication.target.", null, PropertyType.PREFIX, "Enumerate a mapping of other systems which this table should " +
       "replicate their data to. The key suffix is the identifying cluster name and the value is an identifier for a location on the target system, " +
       "e.g. the ID of the table on the target to replicate to"),
@@ -459,39 +450,24 @@ public enum Property {
   GENERAL_MAVEN_PROJECT_BASEDIR(AccumuloClassLoader.MAVEN_PROJECT_BASEDIR_PROPERTY_NAME, AccumuloClassLoader.DEFAULT_MAVEN_PROJECT_BASEDIR_VALUE,
       PropertyType.ABSOLUTEPATH, "Set this to automatically add maven target/classes directories to your dynamic classpath"),
 
-  @Experimental
+  // General properties for configuring replication
   REPLICATION_PREFIX("replication.", null, PropertyType.PREFIX, "Properties in this category affect the replication of data to other Accumulo instances."),
-  @Experimental
   REPLICATION_PEERS("replication.peer.", null, PropertyType.PREFIX, "Properties in this category control what systems data can be replicated to"),
-  @Experimental
   REPLICATION_PEER_USER("replication.peer.user.", null, PropertyType.PREFIX, "The username to provide when authenticating with the given peer"),
-  @Experimental
   @Sensitive
   REPLICATION_PEER_PASSWORD("replication.peer.password.", null, PropertyType.PREFIX, "The password to provide when authenticating with the given peer"),
-  @Experimental
   REPLICATION_NAME("replication.name", "", PropertyType.STRING, "Name of this cluster with respect to replication. Used to identify this instance from other peers"),
-  @Experimental
   REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "20000000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"),
-  @Experimental
   REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"),
-  @Experimental
   REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"),
-  @Experimental
   REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"),
-  @Experimental
   REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", PropertyType.COUNT, "Number of attempts to try to replicate some data before giving up and letting it naturally be retried later"),
-  @Experimental
   REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, "Minimum number of threads for replciation"),
-  @Experimental
   REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "5s", PropertyType.TIMEDURATION, "The time between adjustments of the replication thread pool."),
-  @Experimental
   REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.MEMORY, "Maximum size of data to send in a replication message"),
-  @Experimental
   REPLICATION_WORK_ASSIGNER("replication.work.assigner", "org.apache.accumulo.master.replication.SequentialWorkAssigner", PropertyType.CLASSNAME,
       "Replication WorkAssigner implementation to use"),
-  @Experimental
   REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work"),
-  @Experimental
   REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work"),
 
   ;


[4/4] git commit: ACCUMULO-378 Set a more realistic maximum size on the distributed work queue for work assignment.

Posted by el...@apache.org.
ACCUMULO-378 Set a more realistic maximum size on the distributed work queue for work assignment.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1f0ee9c5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1f0ee9c5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1f0ee9c5

Branch: refs/heads/ACCUMULO-378
Commit: 1f0ee9c5027714997a04c5cacf4ef92fff27ecc1
Parents: 2790542
Author: Josh Elser <el...@apache.org>
Authored: Wed May 28 11:57:45 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 28 11:57:45 2014 -0400

----------------------------------------------------------------------
 core/src/main/java/org/apache/accumulo/core/conf/Property.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f0ee9c5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 8409781..2c7e27b 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -457,7 +457,7 @@ public enum Property {
   @Sensitive
   REPLICATION_PEER_PASSWORD("replication.peer.password.", null, PropertyType.PREFIX, "The password to provide when authenticating with the given peer"),
   REPLICATION_NAME("replication.name", "", PropertyType.STRING, "Name of this cluster with respect to replication. Used to identify this instance from other peers"),
-  REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "20000000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"),
+  REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "1000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"),
   REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"),
   REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"),
   REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"),