You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/10/23 04:42:06 UTC
[1/3] git commit: ACCUMULO-1585 add guava to tool.sh
Updated Branches:
refs/heads/master 86669f733 -> 7f6e51227
ACCUMULO-1585 add guava to tool.sh
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/839d689f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/839d689f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/839d689f
Branch: refs/heads/master
Commit: 839d689f67a53e1b34e01d69a989232d289c8bf8
Parents: 388f87d
Author: Keith Turner <kt...@apache.org>
Authored: Tue Oct 22 20:59:41 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Oct 22 22:40:23 2013 -0400
----------------------------------------------------------------------
bin/tool.sh | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/839d689f/bin/tool.sh
----------------------------------------------------------------------
diff --git a/bin/tool.sh b/bin/tool.sh
index cdcf6ae..376983f 100755
--- a/bin/tool.sh
+++ b/bin/tool.sh
@@ -50,6 +50,7 @@ THRIFT_LIB="$LIB/libthrift.jar"
TRACE_LIB="$LIB/accumulo-trace.jar"
JCOMMANDER_LIB="$LIB/jcommander.jar"
COMMONS_VFS_LIB="$LIB/commons-vfs2.jar"
+GUAVA_LIB="$LIB/guava.jar"
USERJARS=" "
for arg in "$@"; do
@@ -70,8 +71,8 @@ for arg in "$@"; do
fi
done
-LIB_JARS="$THRIFT_LIB,$CORE_LIB,$FATE_LIB,$ZOOKEEPER_LIB,$TRACE_LIB,$JCOMMANDER_LIB,$COMMONS_VFS_LIB"
-H_JARS="$THRIFT_LIB:$CORE_LIB:$FATE_LIB:$ZOOKEEPER_LIB:$TRACE_LIB:$JCOMMANDER_LIB:$COMMONS_VFS_LIB"
+LIB_JARS="$THRIFT_LIB,$CORE_LIB,$FATE_LIB,$ZOOKEEPER_LIB,$TRACE_LIB,$JCOMMANDER_LIB,$COMMONS_VFS_LIB,$GUAVA_LIB"
+H_JARS="$THRIFT_LIB:$CORE_LIB:$FATE_LIB:$ZOOKEEPER_LIB:$TRACE_LIB:$JCOMMANDER_LIB:$COMMONS_VFS_LIB:$GUAVA_LIB"
for jar in $USERJARS; do
LIB_JARS="$LIB_JARS,$jar"
[3/3] git commit: ACCUMULO-1732 Input format changes : used table id
exclusively, fixed issues with table propagation, removed some duplicated code
Posted by kt...@apache.org.
ACCUMULO-1732 Input format changes : used table id exclusively, fixed issues with table propagation, removed some duplicated code
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7f6e5122
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7f6e5122
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7f6e5122
Branch: refs/heads/master
Commit: 7f6e512278a365c9bbb525c8d8fee57e5d573d24
Parents: 839d689
Author: Keith Turner <kt...@apache.org>
Authored: Tue Oct 22 22:38:50 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Oct 22 22:40:24 2013 -0400
----------------------------------------------------------------------
.../core/client/mapred/AbstractInputFormat.java | 138 +++----------------
.../client/mapreduce/AbstractInputFormat.java | 133 +++---------------
.../mapreduce/lib/util/InputConfigurator.java | 116 +++++++++++++++-
.../simple/mapreduce/UniqueColumns.java | 7 +-
4 files changed, 152 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index d474c85..c89c5d7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -31,12 +31,12 @@ import org.apache.accumulo.core.client.ClientSideIteratorScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.impl.OfflineScanner;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mapreduce.BatchScanConfig;
@@ -45,12 +45,9 @@ import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.util.Pair;
@@ -276,8 +273,8 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
* if the table name set on the configuration doesn't exist
* @since 1.6.0
*/
- protected static TabletLocator getTabletLocator(JobConf job, String tableName) throws TableNotFoundException {
- return InputConfigurator.getTabletLocator(CLASS, job, tableName);
+ protected static TabletLocator getTabletLocator(JobConf job, String tableId) throws TableNotFoundException {
+ return InputConfigurator.getTabletLocator(CLASS, job, tableId);
}
// InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -359,34 +356,23 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
split = (RangeInputSplit) inSplit;
log.debug("Initializing input split: " + split.getRange());
Instance instance = getInstance(job);
- String user = getPrincipal(job);
+ String principal = getPrincipal(job);
AuthenticationToken token = getAuthenticationToken(job);
Authorizations authorizations = getScanAuthorizations(job);
+ // in case the table name changed, we can still use the previous name for terms of configuration,
+ // but the scanner will use the table id resolved at job setup time
BatchScanConfig tableConfig = getBatchScanConfig(job, split.getTableName());
- // in case the table name changed, we can still use the previous name for terms of configuration,
- // but for the scanner, we'll need to reference the new table name.
- String actualNameForId = split.getTableName();
- if (!(instance instanceof MockInstance)) {
- try {
- actualNameForId = Tables.getTableName(instance, split.getTableId());
- if (!actualNameForId.equals(split.getTableName()))
- log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId);
- } catch (TableNotFoundException e) {
- throw new IOException("The specified table was not found for id=" + split.getTableId());
- }
- }
try {
- log.debug("Creating connector with user: " + user);
- Connector conn = instance.getConnector(user, token);
+ log.debug("Creating connector with user: " + principal);
log.debug("Creating scanner for table: " + split.getTableName());
log.debug("Authorizations are: " + authorizations);
if (tableConfig.isOfflineScan()) {
- scanner = new OfflineScanner(instance, new Credentials(user, token), split.getTableId(), authorizations);
+ scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations);
} else {
- scanner = conn.createScanner(actualNameForId, authorizations);
+ scanner = new ScannerImpl(instance, new Credentials(principal, token), split.getTableId(), authorizations);
}
if (tableConfig.shouldUseIsolatedScanners()) {
log.info("Creating isolated scanner");
@@ -396,7 +382,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
log.info("Using local iterators");
scanner = new ClientSideIteratorScanner(scanner);
}
- setupIterators(job, scanner, split.getTableName());
+ setupIterators(job, scanner, split.getTableId());
} catch (Exception e) {
throw new IOException(e);
}
@@ -439,102 +425,13 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
}
- Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableName, List<Range> ranges) throws TableNotFoundException, AccumuloException,
+ Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableId, List<Range> ranges) throws TableNotFoundException, AccumuloException,
AccumuloSecurityException {
- Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
Instance instance = getInstance(job);
Connector conn = instance.getConnector(getPrincipal(job), getAuthenticationToken(job));
- String tableId = Tables.getTableId(instance, tableName);
-
- if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
- Tables.clearCache(instance);
- if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
- throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
- }
- }
-
- for (Range range : ranges) {
- Text startRow;
-
- if (range.getStartKey() != null)
- startRow = range.getStartKey().getRow();
- else
- startRow = new Text();
-
- Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
- Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
- scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
- scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
- scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
- scanner.setRange(metadataRange);
-
- RowIterator rowIter = new RowIterator(scanner);
-
- KeyExtent lastExtent = null;
-
- while (rowIter.hasNext()) {
- Iterator<Map.Entry<Key,Value>> row = rowIter.next();
- String last = "";
- KeyExtent extent = null;
- String location = null;
-
- while (row.hasNext()) {
- Map.Entry<Key,Value> entry = row.next();
- Key key = entry.getKey();
-
- if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) {
- last = entry.getValue().toString();
- }
-
- if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
- || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) {
- location = entry.getValue().toString();
- }
-
- if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
- extent = new KeyExtent(key.getRow(), entry.getValue());
- }
-
- }
-
- if (location != null)
- return null;
-
- if (!extent.getTableId().toString().equals(tableId)) {
- throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
- }
-
- if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
- throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
- }
-
- Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
- if (tabletRanges == null) {
- tabletRanges = new HashMap<KeyExtent,List<Range>>();
- binnedRanges.put(last, tabletRanges);
- }
-
- List<Range> rangeList = tabletRanges.get(extent);
- if (rangeList == null) {
- rangeList = new ArrayList<Range>();
- tabletRanges.put(extent, rangeList);
- }
-
- rangeList.add(range);
-
- if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
- break;
- }
-
- lastExtent = extent;
- }
-
- }
-
- return binnedRanges;
+
+ return InputConfigurator.binOffline(tableId, ranges, instance, conn);
}
/**
@@ -562,16 +459,18 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
TabletLocator tl;
try {
+ // resolve table name to id once, and use id from this point forward
+ tableId = Tables.getTableId(getInstance(job), tableName);
if (tableConfig.isOfflineScan()) {
- binnedRanges = binOfflineTable(job, tableName, ranges);
+ binnedRanges = binOfflineTable(job, tableId, ranges);
while (binnedRanges == null) {
// Some tablets were still online, try again
UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
- binnedRanges = binOfflineTable(job, tableName, ranges);
+ binnedRanges = binOfflineTable(job, tableId, ranges);
}
} else {
Instance instance = getInstance(job);
- tl = getTabletLocator(job, tableName);
+ tl = getTabletLocator(job, tableId);
// its possible that the cache could contain complete, but old information about a tables tablets... so clear it
tl.invalidateCache();
Credentials creds = new Credentials(getPrincipal(job), getAuthenticationToken(job));
@@ -582,7 +481,6 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
throw new TableDeletedException(tableId);
if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
throw new TableOfflineException(instance, tableId);
- tableId = Tables.getTableId(instance, tableName);
}
binnedRanges.clear();
log.warn("Unable to locate bins for specified ranges. Retrying.");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index 889dcbb..74f8f8b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -35,12 +35,12 @@ import org.apache.accumulo.core.client.ClientSideIteratorScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.impl.OfflineScanner;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
@@ -53,8 +53,6 @@ import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.util.Pair;
@@ -374,34 +372,22 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
log.debug("Initializing input split: " + split.getRange());
Instance instance = getInstance(attempt);
String principal = getPrincipal(attempt);
+ AuthenticationToken token = getAuthenticationToken(attempt);
+ Authorizations authorizations = getScanAuthorizations(attempt);
+ // in case the table name changed, we can still use the previous name for terms of configuration,
+ // but the scanner will use the table id resolved at job setup time
BatchScanConfig tableConfig = getBatchScanConfig(attempt, split.getTableName());
- // in case the table name changed, we can still use the previous name for terms of configuration,
- // but for the scanner, we'll need to reference the new table name.
- String actualNameForId = split.getTableName();
- if (!(instance instanceof MockInstance)) {
- try {
- actualNameForId = Tables.getTableName(instance, split.getTableId());
- if (!actualNameForId.equals(split.getTableName()))
- log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId);
- } catch (TableNotFoundException e) {
- throw new IOException("The specified table was not found for id=" + split.getTableId());
- }
- }
- AuthenticationToken token = getAuthenticationToken(attempt);
- Authorizations authorizations = getScanAuthorizations(attempt);
try {
log.debug("Creating connector with user: " + principal);
-
- Connector conn = instance.getConnector(principal, token);
log.debug("Creating scanner for table: " + split.getTableName());
log.debug("Authorizations are: " + authorizations);
if (tableConfig.isOfflineScan()) {
scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations);
} else {
- scanner = conn.createScanner(actualNameForId, authorizations);
+ scanner = new ScannerImpl(instance, new Credentials(principal, token), split.getTableId(), authorizations);
}
if (tableConfig.shouldUseIsolatedScanners()) {
log.info("Creating isolated scanner");
@@ -411,7 +397,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
log.info("Using local iterators");
scanner = new ClientSideIteratorScanner(scanner);
}
- setupIterators(attempt, scanner, split.getTableName());
+ setupIterators(attempt, scanner, split.getTableId());
} catch (Exception e) {
throw new IOException(e);
}
@@ -460,102 +446,15 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
}
}
- Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException,
+ Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableId, List<Range> ranges) throws TableNotFoundException,
AccumuloException, AccumuloSecurityException {
- Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
Instance instance = getInstance(context);
Connector conn = instance.getConnector(getPrincipal(context), getAuthenticationToken(context));
- String tableId = Tables.getTableId(instance, tableName);
-
- if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
- Tables.clearCache(instance);
- if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
- throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
- }
- }
-
- for (Range range : ranges) {
- Text startRow;
-
- if (range.getStartKey() != null)
- startRow = range.getStartKey().getRow();
- else
- startRow = new Text();
-
- Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
- Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
- scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
- scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
- scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
- scanner.setRange(metadataRange);
-
- RowIterator rowIter = new RowIterator(scanner);
- KeyExtent lastExtent = null;
- while (rowIter.hasNext()) {
- Iterator<Map.Entry<Key,Value>> row = rowIter.next();
- String last = "";
- KeyExtent extent = null;
- String location = null;
-
- while (row.hasNext()) {
- Map.Entry<Key,Value> entry = row.next();
- Key key = entry.getKey();
-
- if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) {
- last = entry.getValue().toString();
- }
-
- if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
- || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) {
- location = entry.getValue().toString();
- }
-
- if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
- extent = new KeyExtent(key.getRow(), entry.getValue());
- }
- }
-
- if (location != null)
- return null;
-
- if (!extent.getTableId().toString().equals(tableId)) {
- throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
- }
-
- if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
- throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
- }
-
- Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
- if (tabletRanges == null) {
- tabletRanges = new HashMap<KeyExtent,List<Range>>();
- binnedRanges.put(last, tabletRanges);
- }
-
- List<Range> rangeList = tabletRanges.get(extent);
- if (rangeList == null) {
- rangeList = new ArrayList<Range>();
- tabletRanges.put(extent, rangeList);
- }
-
- rangeList.add(range);
-
- if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
- break;
- }
-
- lastExtent = extent;
- }
-
- }
-
- return binnedRanges;
+ return InputConfigurator.binOffline(tableId, ranges, instance, conn);
}
-
+
/**
* Gets the splits of the tables that have been set on the job.
*
@@ -588,24 +487,25 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
TabletLocator tl;
try {
+ // resolve table name to id once, and use id from this point forward
+ tableId = Tables.getTableId(getInstance(context), tableName);
if (tableConfig.isOfflineScan()) {
- binnedRanges = binOfflineTable(context, tableName, ranges);
+ binnedRanges = binOfflineTable(context, tableId, ranges);
while (binnedRanges == null) {
// Some tablets were still online, try again
UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
- binnedRanges = binOfflineTable(context, tableName, ranges);
+ binnedRanges = binOfflineTable(context, tableId, ranges);
}
} else {
Instance instance = getInstance(context);
- tl = getTabletLocator(context, tableName);
+ tl = getTabletLocator(context, tableId);
// its possible that the cache could contain complete, but old information about a tables tablets... so clear it
tl.invalidateCache();
Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context));
while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
if (!(instance instanceof MockInstance)) {
- tableId = Tables.getTableId(instance, tableName);
if (!Tables.exists(instance, tableId))
throw new TableDeletedException(tableId);
if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
@@ -680,6 +580,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
this.setRange(split.getRange());
this.setLocations(split.getLocations());
this.setTableName(split.getTableName());
+ this.setTableId(split.getTableId());
}
protected RangeInputSplit(String table, String tableId, Range range, String[] locations) {
@@ -788,6 +689,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
public void readFields(DataInput in) throws IOException {
range.readFields(in);
tableName = in.readUTF();
+ tableId = in.readUTF();
int numLocs = in.readInt();
locations = new String[numLocs];
for (int i = 0; i < numLocs; ++i)
@@ -798,6 +700,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
public void write(DataOutput out) throws IOException {
range.write(out);
out.writeUTF(tableName);
+ out.writeUTF(tableId);
out.writeInt(locations.length);
for (int i = 0; i < locations.length; ++i)
out.writeUTF(locations[i]);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
index 016efa5..f72c081 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
@@ -27,12 +27,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.StringTokenizer;
-import com.google.common.collect.Maps;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -41,6 +42,7 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.Tables;
@@ -48,8 +50,20 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mapreduce.BatchScanConfig;
import org.apache.accumulo.core.client.mock.MockTabletLocator;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.util.Pair;
@@ -61,6 +75,8 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.collect.Maps;
+
/**
* @since 1.5.0
*/
@@ -561,19 +577,19 @@ public class InputConfigurator extends ConfiguratorBase {
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
- * @param tableName
- * The table name for which to initialize the {@link TabletLocator}
+ * @param tableId
+ * The table id for which to initialize the {@link TabletLocator}
* @return an Accumulo tablet locator
* @throws TableNotFoundException
* if the table name set on the configuration doesn't exist
* @since 1.5.0
*/
- public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf, String tableName) throws TableNotFoundException {
+ public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf, String tableId) throws TableNotFoundException {
String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
if ("MockInstance".equals(instanceType))
return new MockTabletLocator();
Instance instance = getInstance(implementingClass, conf);
- return TabletLocator.getLocator(instance, new Text(Tables.getTableId(instance, tableName)));
+ return TabletLocator.getLocator(instance, new Text(tableId));
}
// InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -668,4 +684,94 @@ public class InputConfigurator extends ConfiguratorBase {
}
return null;
}
+
+ public static Map<String,Map<KeyExtent,List<Range>>> binOffline(String tableId, List<Range> ranges, Instance instance, Connector conn)
+ throws AccumuloException, TableNotFoundException {
+ Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+
+ if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+ Tables.clearCache(instance);
+ if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+ throw new AccumuloException("Table is online tableId:" + tableId + " cannot scan table in offline mode ");
+ }
+ }
+
+ for (Range range : ranges) {
+ Text startRow;
+
+ if (range.getStartKey() != null)
+ startRow = range.getStartKey().getRow();
+ else
+ startRow = new Text();
+
+ Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
+ Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
+ scanner.setRange(metadataRange);
+
+ RowIterator rowIter = new RowIterator(scanner);
+ KeyExtent lastExtent = null;
+ while (rowIter.hasNext()) {
+ Iterator<Map.Entry<Key,Value>> row = rowIter.next();
+ String last = "";
+ KeyExtent extent = null;
+ String location = null;
+
+ while (row.hasNext()) {
+ Map.Entry<Key,Value> entry = row.next();
+ Key key = entry.getKey();
+
+ if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) {
+ last = entry.getValue().toString();
+ }
+
+ if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
+ || key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) {
+ location = entry.getValue().toString();
+ }
+
+ if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
+ extent = new KeyExtent(key.getRow(), entry.getValue());
+ }
+
+ }
+
+ if (location != null)
+ return null;
+
+ if (!extent.getTableId().toString().equals(tableId)) {
+ throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
+ }
+
+ if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
+ throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
+ }
+
+ Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
+ if (tabletRanges == null) {
+ tabletRanges = new HashMap<KeyExtent,List<Range>>();
+ binnedRanges.put(last, tabletRanges);
+ }
+
+ List<Range> rangeList = tabletRanges.get(extent);
+ if (rangeList == null) {
+ rangeList = new ArrayList<Range>();
+ tabletRanges.put(extent, rangeList);
+ }
+
+ rangeList.add(range);
+
+ if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
+ break;
+ }
+
+ lastExtent = extent;
+ }
+
+ }
+ return binnedRanges;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7f6e5122/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
index 11ddf7b..501b7e6 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
@@ -94,6 +94,9 @@ public class UniqueColumns extends Configured implements Tool {
String clone = opts.tableName;
Connector conn = null;
+
+ opts.setAccumuloConfigs(job);
+
if (opts.offline) {
/*
* this example clones the table and takes it offline. If you plan to run map reduce jobs over a table many times, it may be more efficient to compact the
@@ -106,11 +109,11 @@ public class UniqueColumns extends Configured implements Tool {
conn.tableOperations().offline(clone);
AccumuloInputFormat.setOfflineTableScan(job, true);
+ AccumuloInputFormat.setInputTableName(job, clone);
}
job.setInputFormatClass(AccumuloInputFormat.class);
- opts.setAccumuloConfigs(job);
-
+
job.setMapperClass(UMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
[2/3] git commit: ACCUMULO-1762 fixed issues w/ offline map reduce
and fully qualified paths
Posted by kt...@apache.org.
ACCUMULO-1762 fixed issues w/ offline map reduce and fully qualified paths
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/388f87dc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/388f87dc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/388f87dc
Branch: refs/heads/master
Commit: 388f87dc1b7472abbb053d8a3604f3444f03d2a7
Parents: 86669f7
Author: Keith Turner <kt...@apache.org>
Authored: Tue Oct 22 20:52:11 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Oct 22 22:40:23 2013 -0400
----------------------------------------------------------------------
.../accumulo/core/client/impl/OfflineScanner.java | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/388f87dc/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
index 9f6f3cd..0231fad 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
@@ -67,6 +67,7 @@ import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
class OfflineIterator implements Iterator<Entry<Key,Value>> {
@@ -227,13 +228,10 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
String tablesDir = instance.getConfiguration().get(Property.INSTANCE_DFS_DIR) + "/tables";
- String[] volumes = instance.getConfiguration().get(Property.INSTANCE_VOLUMES).split(",");
- if (volumes.length > 1) {
- tablesDir = volumes[0] + tablesDir;
- }
+
List<String> absFiles = new ArrayList<String>();
for (String relPath : relFiles) {
- if (relFiles.contains(":")) {
+ if (relPath.contains(":")) {
absFiles.add(relPath);
} else {
// handle old-style relative paths
@@ -298,7 +296,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
Configuration conf = CachedConfiguration.getInstance();
- FileSystem fs = FileUtil.getFileSystem(conf, instance.getConfiguration());
+ FileSystem defaultFs = FileUtil.getFileSystem(conf, instance.getConfiguration());
for (SortedKeyValueIterator<Key,Value> reader : readers) {
((FileSKVIterator) reader).close();
@@ -308,6 +306,10 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
// TODO need to close files - ACCUMULO-1303
for (String file : absFiles) {
+ FileSystem fs = defaultFs;
+ if (file.contains(":"))
+ fs = new Path(file).getFileSystem(conf);
+
FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null);
readers.add(reader);
}