You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 03:59:53 UTC
[34/50] [abbrv] git commit: ACCUMULO-2799 Split the drain methods to
fetch the set of files needing replication and then waiting on those files
ACCUMULO-2799 Split the drain methods to fetch the set of files needing replication and then waiting on those files
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/26a88b4a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/26a88b4a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/26a88b4a
Branch: refs/heads/ACCUMULO-378
Commit: 26a88b4a34a47693607a177838d335e1fb85ea2f
Parents: eb4b9ce
Author: Josh Elser <el...@apache.org>
Authored: Mon May 19 15:39:25 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 19 15:39:25 2014 -0400
----------------------------------------------------------------------
.../client/admin/ReplicationOperations.java | 17 +++
.../client/impl/ReplicationOperationsImpl.java | 110 ++++++++++++-------
2 files changed, 90 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/26a88b4a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
index 4fd7d07..1680732 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
@@ -16,6 +16,8 @@
*/
package org.apache.accumulo.core.client.admin;
+import java.util.Set;
+
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -58,4 +60,19 @@ public interface ReplicationOperations {
* @throws AccumuloSecurityException
*/
public void drain(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
+
+ /**
+ * Wait for a table to be fully replicated as determined by the provided tables
+ * @param tableName The table to wait for
+ * @throws AccumuloException
+ * @throws AccumuloSecurityException
+ */
+ public void drain(String tableName, Set<String> files) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
+
+ /**
+ * Get all of the referenced files for a table
+ * @param tableName
+ * @throws TableNotFoundException
+ */
+ public Set<String> referencedFiles(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/26a88b4a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index d2698bd..752952d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -115,7 +115,14 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
public void drain(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
checkNotNull(tableName);
- log.debug("Waiting to drain {}", tableName);
+ Set<String> wals = referencedFiles(tableName);
+
+ drain(tableName, wals);
+ }
+
+ @Override
+ public void drain(String tableName, Set<String> wals) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ checkNotNull(tableName);
Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
TableOperations tops = conn.tableOperations();
@@ -124,7 +131,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
}
if (!conn.tableOperations().exists(tableName)) {
- throw new IllegalArgumentException("Table does not exist: " + tableName);
+ throw new TableNotFoundException(null, tableName, null);
}
String strTableId = null;
@@ -137,41 +144,6 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
Text tableId = new Text(strTableId);
- log.debug("Found {} id for {}", strTableId, tableName);
-
- // Get the WALs currently referenced by the table
- BatchScanner metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
- metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(strTableId)));
- metaBs.fetchColumnFamily(LogColumnFamily.NAME);
- Set<String> wals = new HashSet<>();
- try {
- for (Entry<Key,Value> entry : metaBs) {
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- for (String log : logEntry.logSet) {
- wals.add(new Path(log).toString());
- }
- }
- } finally {
- metaBs.close();
- }
-
- // And the WALs that need to be replicated for this table
- metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
- metaBs.setRanges(Collections.singleton(ReplicationSection.getRange()));
- metaBs.fetchColumnFamily(ReplicationSection.COLF);
- try {
- Text buffer = new Text();
- for (Entry<Key,Value> entry : metaBs) {
- ReplicationSection.getTableId(entry.getKey(), buffer);
- if (buffer.equals(tableId)) {
- ReplicationSection.getFile(entry.getKey(), buffer);
- wals.add(buffer.toString());
- }
- }
- } finally {
- metaBs.close();
- }
-
log.info("Waiting for {} to be replicated for {}", wals, tableId);
log.info("Reading from metadata table");
@@ -253,4 +225,68 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
return true;
}
+
+ @Override
+ public Set<String> referencedFiles(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ checkNotNull(tableName);
+
+ log.debug("Collecting referenced files for replication of table {}", tableName);
+
+ Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+ TableOperations tops = conn.tableOperations();
+ while (!tops.exists(ReplicationTable.NAME)) {
+ UtilWaitThread.sleep(200);
+ }
+
+ if (!conn.tableOperations().exists(tableName)) {
+ throw new TableNotFoundException(null, tableName, null);
+ }
+
+ String strTableId = null;
+ while (null == strTableId) {
+ strTableId = tops.tableIdMap().get(tableName);
+ if (null == strTableId) {
+ UtilWaitThread.sleep(200);
+ }
+ }
+
+ Text tableId = new Text(strTableId);
+
+ log.debug("Found id of {} for name {}", strTableId, tableName);
+
+ // Get the WALs currently referenced by the table
+ BatchScanner metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
+ metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(strTableId)));
+ metaBs.fetchColumnFamily(LogColumnFamily.NAME);
+ Set<String> wals = new HashSet<>();
+ try {
+ for (Entry<Key,Value> entry : metaBs) {
+ LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+ for (String log : logEntry.logSet) {
+ wals.add(new Path(log).toString());
+ }
+ }
+ } finally {
+ metaBs.close();
+ }
+
+ // And the WALs that need to be replicated for this table
+ metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
+ metaBs.setRanges(Collections.singleton(ReplicationSection.getRange()));
+ metaBs.fetchColumnFamily(ReplicationSection.COLF);
+ try {
+ Text buffer = new Text();
+ for (Entry<Key,Value> entry : metaBs) {
+ ReplicationSection.getTableId(entry.getKey(), buffer);
+ if (buffer.equals(tableId)) {
+ ReplicationSection.getFile(entry.getKey(), buffer);
+ wals.add(buffer.toString());
+ }
+ }
+ } finally {
+ metaBs.close();
+ }
+
+ return wals;
+ }
}