You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 03:59:53 UTC

[34/50] [abbrv] git commit: ACCUMULO-2799 Split the drain methods to fetch the set of files needing replication and then waiting on those files

ACCUMULO-2799 Split the drain methods to fetch the set of files needing replication and then waiting on those files


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/26a88b4a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/26a88b4a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/26a88b4a

Branch: refs/heads/ACCUMULO-378
Commit: 26a88b4a34a47693607a177838d335e1fb85ea2f
Parents: eb4b9ce
Author: Josh Elser <el...@apache.org>
Authored: Mon May 19 15:39:25 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 19 15:39:25 2014 -0400

----------------------------------------------------------------------
 .../client/admin/ReplicationOperations.java     |  17 +++
 .../client/impl/ReplicationOperationsImpl.java  | 110 ++++++++++++-------
 2 files changed, 90 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/26a88b4a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
index 4fd7d07..1680732 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.core.client.admin;
 
+import java.util.Set;
+
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -58,4 +60,19 @@ public interface ReplicationOperations {
    * @throws AccumuloSecurityException
    */
   public void drain(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
+
+  /**
+   * Wait for a table to be fully replicated as determined by the provided tables
+   * @param tableName The table to wait for
+   * @throws AccumuloException
+   * @throws AccumuloSecurityException
+   */
+  public void drain(String tableName, Set<String> files) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
+
+  /**
+   * Get all of the referenced files for a table
+   * @param tableName
+   * @throws TableNotFoundException
+   */
+  public Set<String> referencedFiles(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/26a88b4a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index d2698bd..752952d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -115,7 +115,14 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
   public void drain(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     checkNotNull(tableName);
 
-    log.debug("Waiting to drain {}", tableName);
+    Set<String> wals = referencedFiles(tableName);
+
+    drain(tableName, wals);
+  }
+
+  @Override
+  public void drain(String tableName, Set<String> wals) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    checkNotNull(tableName);
 
     Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
     TableOperations tops = conn.tableOperations();
@@ -124,7 +131,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
     }
 
     if (!conn.tableOperations().exists(tableName)) {
-      throw new IllegalArgumentException("Table does not exist: " + tableName);
+      throw new TableNotFoundException(null, tableName, null);
     }
 
     String strTableId = null;
@@ -137,41 +144,6 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
 
     Text tableId = new Text(strTableId);
 
-    log.debug("Found {} id for {}", strTableId, tableName);
-
-    // Get the WALs currently referenced by the table
-    BatchScanner metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4); 
-    metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(strTableId)));
-    metaBs.fetchColumnFamily(LogColumnFamily.NAME);
-    Set<String> wals = new HashSet<>();
-    try {
-      for (Entry<Key,Value> entry : metaBs) {
-        LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-        for (String log : logEntry.logSet) {
-          wals.add(new Path(log).toString());
-        }
-      }
-    } finally {
-      metaBs.close();
-    }
-
-    // And the WALs that need to be replicated for this table
-    metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
-    metaBs.setRanges(Collections.singleton(ReplicationSection.getRange()));
-    metaBs.fetchColumnFamily(ReplicationSection.COLF);
-    try {
-      Text buffer = new Text();
-      for (Entry<Key,Value> entry : metaBs) {
-        ReplicationSection.getTableId(entry.getKey(), buffer);
-        if (buffer.equals(tableId)) {
-          ReplicationSection.getFile(entry.getKey(), buffer);
-          wals.add(buffer.toString());
-        }
-      }
-    } finally {
-      metaBs.close();
-    }
-
     log.info("Waiting for {} to be replicated for {}", wals, tableId);
 
     log.info("Reading from metadata table");
@@ -253,4 +225,68 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
 
     return true;
   }
+
+  @Override
+  public Set<String> referencedFiles(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    checkNotNull(tableName);
+
+    log.debug("Collecting referenced files for replication of table {}", tableName);
+
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+    TableOperations tops = conn.tableOperations();
+    while (!tops.exists(ReplicationTable.NAME)) {
+      UtilWaitThread.sleep(200);
+    }
+
+    if (!conn.tableOperations().exists(tableName)) {
+      throw new TableNotFoundException(null, tableName, null);
+    }
+
+    String strTableId = null;
+    while (null == strTableId) {
+      strTableId = tops.tableIdMap().get(tableName);
+      if (null == strTableId) {
+        UtilWaitThread.sleep(200);
+      }
+    }
+
+    Text tableId = new Text(strTableId);
+
+    log.debug("Found id of {} for name {}", strTableId, tableName);
+
+    // Get the WALs currently referenced by the table
+    BatchScanner metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4); 
+    metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(strTableId)));
+    metaBs.fetchColumnFamily(LogColumnFamily.NAME);
+    Set<String> wals = new HashSet<>();
+    try {
+      for (Entry<Key,Value> entry : metaBs) {
+        LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+        for (String log : logEntry.logSet) {
+          wals.add(new Path(log).toString());
+        }
+      }
+    } finally {
+      metaBs.close();
+    }
+
+    // And the WALs that need to be replicated for this table
+    metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
+    metaBs.setRanges(Collections.singleton(ReplicationSection.getRange()));
+    metaBs.fetchColumnFamily(ReplicationSection.COLF);
+    try {
+      Text buffer = new Text();
+      for (Entry<Key,Value> entry : metaBs) {
+        ReplicationSection.getTableId(entry.getKey(), buffer);
+        if (buffer.equals(tableId)) {
+          ReplicationSection.getFile(entry.getKey(), buffer);
+          wals.add(buffer.toString());
+        }
+      }
+    } finally {
+      metaBs.close();
+    }
+
+    return wals;
+  }
 }