You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2015/04/22 00:23:49 UTC
[8/9] accumulo git commit: ACCUMULO-3602 ACCUMULO-3657 Minimize
AccumuloInputSplit in API
ACCUMULO-3602 ACCUMULO-3657 Minimize AccumuloInputSplit in API
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d1e6e79c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d1e6e79c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d1e6e79c
Branch: refs/heads/master
Commit: d1e6e79cf12ee420dd1d20fd605723f0e5505f68
Parents: c625291
Author: Keith Turner <kt...@apache.org>
Authored: Tue Apr 21 17:30:48 2015 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Apr 21 17:53:32 2015 -0400
----------------------------------------------------------------------
.../core/client/mapred/AbstractInputFormat.java | 58 ++++++++--------
.../client/mapreduce/AbstractInputFormat.java | 56 ++++++++-------
.../core/client/mapreduce/RangeInputSplit.java | 24 ++-----
.../mapreduce/impl/AccumuloInputSplit.java | 73 +++++++++-----------
.../client/mapreduce/impl/BatchInputSplit.java | 21 ++----
.../core/client/mapreduce/impl/SplitUtils.java | 59 ++++++++++++++++
6 files changed, 162 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index b97d4de..f2e3a79 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -29,18 +29,18 @@ import java.util.Random;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.ClientSideIteratorScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
-import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier;
@@ -54,6 +54,7 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mapred.impl.BatchInputSplit;
import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit;
+import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils;
import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
import org.apache.accumulo.core.client.mock.MockInstance;
@@ -394,7 +395,8 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
protected abstract static class AbstractRecordReader<K,V> implements RecordReader<K,V> {
protected long numKeysRead;
protected Iterator<Map.Entry<Key,Value>> scannerIterator;
- protected org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit split;
+ protected RangeInputSplit split;
+ private org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit aiSplit;
protected ScannerBase scannerBase;
@@ -458,42 +460,42 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
* Initialize a scanner over the given input split using this task attempt configuration.
*/
public void initialize(InputSplit inSplit, JobConf job) throws IOException {
- split = (AccumuloInputSplit) inSplit;
- log.debug("Initializing input split: " + split.toString());
+ aiSplit = (AccumuloInputSplit) inSplit;
+ log.debug("Initializing input split: " + aiSplit.toString());
- Instance instance = split.getInstance(getClientConfiguration(job));
+ Instance instance = aiSplit.getInstance(getClientConfiguration(job));
if (null == instance) {
instance = getInstance(job);
}
- String principal = split.getPrincipal();
+ String principal = aiSplit.getPrincipal();
if (null == principal) {
principal = getPrincipal(job);
}
- AuthenticationToken token = split.getToken();
+ AuthenticationToken token = aiSplit.getToken();
if (null == token) {
token = getAuthenticationToken(job);
}
- Authorizations authorizations = split.getAuths();
+ Authorizations authorizations = aiSplit.getAuths();
if (null == authorizations) {
authorizations = getScanAuthorizations(job);
}
- String table = split.getTableName();
+ String table = aiSplit.getTableName();
// in case the table name changed, we can still use the previous name for terms of configuration,
// but the scanner will use the table id resolved at job setup time
- InputTableConfig tableConfig = getInputTableConfig(job, split.getTableName());
+ InputTableConfig tableConfig = getInputTableConfig(job, aiSplit.getTableName());
log.debug("Creating connector with user: " + principal);
log.debug("Creating scanner for table: " + table);
log.debug("Authorizations are: " + authorizations);
- if (split instanceof org.apache.accumulo.core.client.mapreduce.RangeInputSplit) {
- org.apache.accumulo.core.client.mapreduce.RangeInputSplit rangeSplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split;
-
+ if (aiSplit instanceof RangeInputSplit) {
+ RangeInputSplit rangeSplit = (RangeInputSplit) aiSplit;
+ split = rangeSplit;
Boolean isOffline = rangeSplit.isOffline();
if (null == isOffline) {
isOffline = tableConfig.isOfflineScan();
@@ -513,13 +515,13 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
try {
if (isOffline) {
- scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations);
+ scanner = new OfflineScanner(instance, new Credentials(principal, token), aiSplit.getTableId(), authorizations);
} else if (instance instanceof MockInstance) {
- scanner = instance.getConnector(principal, token).createScanner(split.getTableName(), authorizations);
+ scanner = instance.getConnector(principal, token).createScanner(aiSplit.getTableName(), authorizations);
} else {
ClientConfiguration clientConf = getClientConfiguration(job);
ClientContext context = new ClientContext(instance, new Credentials(principal, token), clientConf);
- scanner = new ScannerImpl(context, split.getTableId(), authorizations);
+ scanner = new ScannerImpl(context, aiSplit.getTableId(), authorizations);
}
if (isIsolated) {
log.info("Creating isolated scanner");
@@ -529,7 +531,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
log.info("Using local iterators");
scanner = new ClientSideIteratorScanner(scanner);
}
- setupIterators(job, scanner, split.getTableName(), split);
+ setupIterators(job, scanner, aiSplit.getTableName(), aiSplit);
} catch (Exception e) {
throw new IOException(e);
}
@@ -537,15 +539,15 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
scanner.setRange(rangeSplit.getRange());
scannerBase = scanner;
- } else if (split instanceof BatchInputSplit) {
+ } else if (aiSplit instanceof BatchInputSplit) {
BatchScanner scanner;
- BatchInputSplit multiRangeSplit = (BatchInputSplit) split;
+ BatchInputSplit multiRangeSplit = (BatchInputSplit) aiSplit;
try{
// Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit will not span tablets
int scanThreads = 1;
- scanner = instance.getConnector(principal, token).createBatchScanner(split.getTableName(), authorizations, scanThreads);
- setupIterators(job, scanner, split.getTableName(), split);
+ scanner = instance.getConnector(principal, token).createBatchScanner(aiSplit.getTableName(), authorizations, scanThreads);
+ setupIterators(job, scanner, aiSplit.getTableName(), aiSplit);
} catch (Exception e) {
throw new IOException(e);
}
@@ -554,10 +556,10 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
scannerBase = scanner;
} else {
- throw new IllegalArgumentException("Can not initialize from " + split.getClass().toString());
+ throw new IllegalArgumentException("Can not initialize from " + aiSplit.getClass().toString());
}
- Collection<Pair<Text,Text>> columns = split.getFetchedColumns();
+ Collection<Pair<Text,Text>> columns = aiSplit.getFetchedColumns();
if (null == columns) {
columns = tableConfig.getFetchedColumns();
}
@@ -593,7 +595,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
public float getProgress() throws IOException {
if (numKeysRead > 0 && currentKey == null)
return 1.0f;
- return split.getProgress(currentKey);
+ return aiSplit.getProgress(currentKey);
}
protected Key currentKey = null;
@@ -721,7 +723,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
clippedRanges.add(ke.clip(r));
BatchInputSplit split = new BatchInputSplit(tableName, tableId, clippedRanges, new String[] {location});
- AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
+ SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
splits.add(split);
} else {
@@ -730,7 +732,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
if (autoAdjust) {
// divide ranges into smaller ranges, based on the tablets
RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location});
- AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
+ SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
split.setOffline(tableConfig.isOfflineScan());
split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
@@ -752,7 +754,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
if (!autoAdjust)
for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0]));
- AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
+ SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
split.setOffline(tableConfig.isOfflineScan());
split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index c7a304c..d402bb0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -29,13 +29,14 @@ import java.util.Random;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.ClientSideIteratorScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -52,6 +53,7 @@ import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit;
import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit;
+import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils;
import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
import org.apache.accumulo.core.client.mock.MockInstance;
@@ -67,7 +69,6 @@ import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -425,7 +426,8 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
protected long numKeysRead;
protected Iterator<Map.Entry<Key,Value>> scannerIterator;
protected ScannerBase scannerBase;
- protected AccumuloInputSplit split;
+ protected RangeInputSplit split;
+ private AccumuloInputSplit aiSplit;
/**
* Extracts Iterators settings from the context to be used by RecordReader.
@@ -489,41 +491,42 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
@Override
public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
- split = (AccumuloInputSplit) inSplit;
- log.debug("Initializing input split: " + split.toString());
+ aiSplit = (AccumuloInputSplit) inSplit;
+ log.debug("Initializing input split: " + aiSplit.toString());
- Instance instance = split.getInstance(getClientConfiguration(attempt));
+ Instance instance = aiSplit.getInstance(getClientConfiguration(attempt));
if (null == instance) {
instance = getInstance(attempt);
}
- String principal = split.getPrincipal();
+ String principal = aiSplit.getPrincipal();
if (null == principal) {
principal = getPrincipal(attempt);
}
- AuthenticationToken token = split.getToken();
+ AuthenticationToken token = aiSplit.getToken();
if (null == token) {
token = getAuthenticationToken(attempt);
}
- Authorizations authorizations = split.getAuths();
+ Authorizations authorizations = aiSplit.getAuths();
if (null == authorizations) {
authorizations = getScanAuthorizations(attempt);
}
- String table = split.getTableName();
+ String table = aiSplit.getTableName();
// in case the table name changed, we can still use the previous name for terms of configuration,
// but the scanner will use the table id resolved at job setup time
- InputTableConfig tableConfig = getInputTableConfig(attempt, split.getTableName());
+ InputTableConfig tableConfig = getInputTableConfig(attempt, aiSplit.getTableName());
log.debug("Creating connector with user: " + principal);
log.debug("Creating scanner for table: " + table);
log.debug("Authorizations are: " + authorizations);
- if (split instanceof RangeInputSplit) {
- RangeInputSplit rangeSplit = (RangeInputSplit) split;
+ if (aiSplit instanceof RangeInputSplit) {
+ RangeInputSplit rangeSplit = (RangeInputSplit) aiSplit;
+ split = rangeSplit;
Scanner scanner;
Boolean isOffline = rangeSplit.isOffline();
@@ -543,13 +546,13 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
try {
if (isOffline) {
- scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations);
+ scanner = new OfflineScanner(instance, new Credentials(principal, token), aiSplit.getTableId(), authorizations);
} else if (instance instanceof MockInstance) {
- scanner = instance.getConnector(principal, token).createScanner(split.getTableName(), authorizations);
+ scanner = instance.getConnector(principal, token).createScanner(aiSplit.getTableName(), authorizations);
} else {
ClientConfiguration clientConf = getClientConfiguration(attempt);
ClientContext context = new ClientContext(instance, new Credentials(principal, token), clientConf);
- scanner = new ScannerImpl(context, split.getTableId(), authorizations);
+ scanner = new ScannerImpl(context, aiSplit.getTableId(), authorizations);
}
if (isIsolated) {
log.info("Creating isolated scanner");
@@ -560,7 +563,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
scanner = new ClientSideIteratorScanner(scanner);
}
- setupIterators(attempt, scanner, split.getTableName(), split);
+ setupIterators(attempt, scanner, aiSplit.getTableName(), aiSplit);
} catch (Exception e) {
throw new IOException(e);
}
@@ -568,16 +571,17 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
scanner.setRange(rangeSplit.getRange());
scannerBase = scanner;
- } else if (split instanceof BatchInputSplit) {
- BatchInputSplit batchSplit = (BatchInputSplit) split;
+ } else if (aiSplit instanceof BatchInputSplit) {
+ BatchInputSplit batchSplit = (BatchInputSplit) aiSplit;
BatchScanner scanner;
try{
// Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit will not span tablets
int scanThreads = 1;
- scanner = instance.getConnector(principal, token).createBatchScanner(split.getTableName(), authorizations, scanThreads);
- setupIterators(attempt, scanner, split.getTableName(), split);
+ scanner = instance.getConnector(principal, token).createBatchScanner(aiSplit.getTableName(), authorizations, scanThreads);
+ setupIterators(attempt, scanner, aiSplit.getTableName(), aiSplit);
} catch (Exception e) {
+ e.printStackTrace();
throw new IOException(e);
}
@@ -585,7 +589,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
scannerBase = scanner;
}
- Collection<Pair<Text,Text>> columns = split.getFetchedColumns();
+ Collection<Pair<Text,Text>> columns = aiSplit.getFetchedColumns();
if (null == columns) {
columns = tableConfig.getFetchedColumns();
}
@@ -616,7 +620,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
public float getProgress() throws IOException {
if (numKeysRead > 0 && currentKey == null)
return 1.0f;
- return split.getProgress(currentKey);
+ return aiSplit.getProgress(currentKey);
}
/**
@@ -767,7 +771,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
for(Range r: extentRanges.getValue())
clippedRanges.add(ke.clip(r));
BatchInputSplit split = new BatchInputSplit(tableName, tableId, clippedRanges, new String[] {location});
- AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
+ SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
splits.add(split);
} else {
@@ -776,7 +780,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
if (autoAdjust) {
// divide ranges into smaller ranges, based on the tablets
RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location});
- AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
+ SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
split.setOffline(tableConfig.isOfflineScan());
split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
@@ -798,7 +802,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
if (!autoAdjust)
for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0]));
- AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
+ SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel);
split.setOffline(tableConfig.isOfflineScan());
split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
index 6c870a0..9851192 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
@@ -19,9 +19,9 @@ package org.apache.accumulo.core.client.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Arrays;
import org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit;
+import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
@@ -47,6 +47,7 @@ public class RangeInputSplit extends AccumuloInputSplit {
this.range = range;
}
+ @Override
public float getProgress(Key currentKey) {
if (currentKey == null)
return 0f;
@@ -55,13 +56,13 @@ public class RangeInputSplit extends AccumuloInputSplit {
if (range.getStartKey() != null && range.getEndKey() != null) {
if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) {
// just look at the row progress
- return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData());
+ return SplitUtils.getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData());
} else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) {
// just look at the column family progress
- return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
+ return SplitUtils.getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
} else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) {
// just look at the column qualifier progress
- return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
+ return SplitUtils.getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
}
}
}
@@ -124,23 +125,10 @@ public class RangeInputSplit extends AccumuloInputSplit {
StringBuilder sb = new StringBuilder(256);
sb.append("RangeInputSplit:");
sb.append(" Range: ").append(range);
- sb.append(" Locations: ").append(Arrays.asList(locations));
- sb.append(" Table: ").append(tableName);
- sb.append(" TableID: ").append(tableId);
- sb.append(" InstanceName: ").append(instanceName);
- sb.append(" zooKeepers: ").append(zooKeepers);
- sb.append(" principal: ").append(principal);
- sb.append(" tokenSource: ").append(tokenSource);
- sb.append(" authenticationToken: ").append(token);
- sb.append(" authenticationTokenFile: ").append(tokenFile);
- sb.append(" Authorizations: ").append(auths);
+ sb.append(super.toString());
sb.append(" offlineScan: ").append(offline);
- sb.append(" mockInstance: ").append(mockInstance);
sb.append(" isolatedScan: ").append(isolatedScan);
sb.append(" localIterators: ").append(localIterators);
- sb.append(" fetchColumns: ").append(fetchedColumns);
- sb.append(" iterators: ").append(iterators);
- sb.append(" logLevel: ").append(level);
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java
index 94d0026..7f83936 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java
@@ -21,7 +21,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -33,18 +32,17 @@ import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource;
import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Base64;
import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.data.Key;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -58,16 +56,16 @@ import org.apache.log4j.Level;
* @see org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit
*/
public abstract class AccumuloInputSplit extends InputSplit implements Writable {
- protected String[] locations;
- protected String tableId, tableName, instanceName, zooKeepers, principal;
- protected TokenSource tokenSource;
- protected String tokenFile;
- protected AuthenticationToken token;
- protected Boolean mockInstance;
- protected Authorizations auths;
- protected Set<Pair<Text,Text>> fetchedColumns;
- protected List<IteratorSetting> iterators;
- protected Level level;
+ private String[] locations;
+ private String tableId, tableName, instanceName, zooKeepers, principal;
+ private TokenSource tokenSource;
+ private String tokenFile;
+ private AuthenticationToken token;
+ private Boolean mockInstance;
+ private Authorizations auths;
+ private Set<Pair<Text,Text>> fetchedColumns;
+ private List<IteratorSetting> iterators;
+ private Level level;
public abstract float getProgress(Key currentKey);
@@ -89,26 +87,7 @@ public abstract class AccumuloInputSplit extends InputSplit implements Writable
this.tableId = tableId;
}
- /**
- * Central place to set common split configuration not handled by split constructors.
- * The intention is to make it harder to miss optional setters in future refactor.
- */
- public static void updateSplit(AccumuloInputSplit split, Instance instance, InputTableConfig tableConfig,
- String principal, AuthenticationToken token, Authorizations auths, Level logLevel) {
- split.setInstanceName(instance.getInstanceName());
- split.setZooKeepers(instance.getZooKeepers());
- split.setMockInstance(instance instanceof MockInstance);
-
- split.setPrincipal(principal);
- split.setToken(token);
- split.setAuths(auths);
-
- split.setFetchedColumns(tableConfig.getFetchedColumns());
- split.setIterators(tableConfig.getIterators());
- split.setLogLevel(logLevel);
- }
-
- private static byte[] extractBytes(ByteSequence seq, int numBytes) {
+ static byte[] extractBytes(ByteSequence seq, int numBytes) {
byte[] bytes = new byte[numBytes + 1];
bytes[0] = 0;
for (int i = 0; i < numBytes; i++) {
@@ -120,14 +99,6 @@ public abstract class AccumuloInputSplit extends InputSplit implements Writable
return bytes;
}
- public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
- int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
- BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
- BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
- BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
- return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
- }
-
public long getRangeLength(Range range) throws IOException {
Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow();
Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
@@ -442,4 +413,24 @@ public abstract class AccumuloInputSplit extends InputSplit implements Writable
public void setLogLevel(Level level) {
this.level = level;
}
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder(256);
+ sb.append(" Locations: ").append(Arrays.asList(locations));
+ sb.append(" Table: ").append(tableName);
+ sb.append(" TableID: ").append(tableId);
+ sb.append(" InstanceName: ").append(instanceName);
+ sb.append(" zooKeepers: ").append(zooKeepers);
+ sb.append(" principal: ").append(principal);
+ sb.append(" tokenSource: ").append(tokenSource);
+ sb.append(" authenticationToken: ").append(token);
+ sb.append(" authenticationTokenFile: ").append(tokenFile);
+ sb.append(" Authorizations: ").append(auths);
+ sb.append(" mockInstance: ").append(mockInstance);
+ sb.append(" fetchColumns: ").append(fetchedColumns);
+ sb.append(" iterators: ").append(iterators);
+ sb.append(" logLevel: ").append(level);
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
index 269622a..24b9ef3 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
@@ -53,6 +53,7 @@ public class BatchInputSplit extends AccumuloInputSplit {
/**
* Save progress on each call to this function, implied by value of currentKey, and return average ranges in the split
*/
+ @Override
public float getProgress(Key currentKey) {
if (null == rangeProgress)
rangeProgress = new float[ranges.size()];
@@ -70,13 +71,13 @@ public class BatchInputSplit extends AccumuloInputSplit {
if (range.getStartKey() != null && range.getEndKey() != null) {
if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) {
// just look at the row progress
- rangeProgress[i] = getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData());
+ rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData());
} else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) {
// just look at the column family progress
- rangeProgress[i] = getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
+ rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
} else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) {
// just look at the column qualifier progress
- rangeProgress[i] = getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
+ rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
}
}
total += rangeProgress[i];
@@ -126,19 +127,7 @@ public class BatchInputSplit extends AccumuloInputSplit {
StringBuilder sb = new StringBuilder(256);
sb.append("BatchInputSplit:");
sb.append(" Ranges: ").append(Arrays.asList(ranges));
- sb.append(" Location: ").append(Arrays.asList(locations));
- sb.append(" Table: ").append(tableName);
- sb.append(" TableID: ").append(tableId);
- sb.append(" InstanceName: ").append(instanceName);
- sb.append(" zooKeepers: ").append(zooKeepers);
- sb.append(" principal: ").append(principal);
- sb.append(" tokenSource: ").append(tokenSource);
- sb.append(" authenticationToken: ").append(token);
- sb.append(" authenticationTokenFile: ").append(tokenFile);
- sb.append(" Authorizations: ").append(auths);
- sb.append(" fetchColumns: ").append(fetchedColumns);
- sb.append(" iterators: ").append(iterators);
- sb.append(" logLevel: ").append(level);
+ sb.append(super.toString());
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java
new file mode 100644
index 0000000..0aee665
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.mapreduce.impl;
+
+import java.math.BigInteger;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.log4j.Level;
+
+public class SplitUtils {
+
+ /**
+ * Central place to set common split configuration not handled by split constructors.
+ * The intention is to make it harder to miss optional setters in future refactor.
+ */
+ public static void updateSplit(AccumuloInputSplit split, Instance instance, InputTableConfig tableConfig,
+ String principal, AuthenticationToken token, Authorizations auths, Level logLevel) {
+ split.setInstanceName(instance.getInstanceName());
+ split.setZooKeepers(instance.getZooKeepers());
+ split.setMockInstance(instance instanceof MockInstance);
+
+ split.setPrincipal(principal);
+ split.setToken(token);
+ split.setAuths(auths);
+
+ split.setFetchedColumns(tableConfig.getFetchedColumns());
+ split.setIterators(tableConfig.getIterators());
+ split.setLogLevel(logLevel);
+ }
+
+ public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
+ int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
+ BigInteger startBI = new BigInteger(AccumuloInputSplit.extractBytes(start, maxDepth));
+ BigInteger endBI = new BigInteger(AccumuloInputSplit.extractBytes(end, maxDepth));
+ BigInteger positionBI = new BigInteger(AccumuloInputSplit.extractBytes(position, maxDepth));
+ return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
+ }
+
+}