You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/01/24 03:15:48 UTC
[15/21] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
Conflicts:
core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
test/system/continuous/run-verify.sh
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/23bb4321
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/23bb4321
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/23bb4321
Branch: refs/heads/master
Commit: 23bb43210afcf7fa57ad1fba19ec62f228831908
Parents: a147acd 36cec4f
Author: Josh Elser <el...@apache.org>
Authored: Thu Jan 23 19:07:16 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jan 23 21:00:49 2014 -0500
----------------------------------------------------------------------
.../core/client/impl/OfflineScanner.java | 17 +-
.../core/client/mapreduce/RangeInputSplit.java | 13 +-
.../mapreduce/lib/util/ConfiguratorBase.java | 8 +-
.../core/client/mapreduce/InputFormatBase.java | 1634 ------------------
.../test/continuous/ContinuousVerify.java | 21 -
test/system/continuous/run-verify.sh | 24 +-
6 files changed, 15 insertions(+), 1702 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/23bb4321/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
index 3a49608,0000000..5f3069a
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
@@@ -1,411 -1,0 +1,416 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
++import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
++import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
++import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.CredentialHelper;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+class OfflineIterator implements Iterator<Entry<Key,Value>> {
+
+ static class OfflineIteratorEnvironment implements IteratorEnvironment {
+ @Override
+ public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public AccumuloConfiguration getConfig() {
+ return AccumuloConfiguration.getDefaultConfiguration();
+ }
+
+ @Override
+ public IteratorScope getIteratorScope() {
+ return IteratorScope.scan;
+ }
+
+ @Override
+ public boolean isFullMajorCompaction() {
+ return false;
+ }
+
+ private ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>();
+
+ @Override
+ public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
+ topLevelIterators.add(iter);
+ }
+
+ SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
+ if (topLevelIterators.isEmpty())
+ return iter;
+ ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators);
+ allIters.add(iter);
+ return new MultiIterator(allIters, false);
+ }
+ }
+
+ private SortedKeyValueIterator<Key,Value> iter;
+ private Range range;
+ private KeyExtent currentExtent;
+ private Connector conn;
+ private String tableId;
+ private Authorizations authorizations;
+ private Instance instance;
+ private ScannerOptions options;
+ private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
-
++ private AccumuloConfiguration config;
++
+ /**
+ * @param instance
+ * @param credentials
+ * @param authorizations
+ * @param table
+ */
+ public OfflineIterator(ScannerOptions options, Instance instance, TCredentials credentials, Authorizations authorizations, Text table, Range range) {
+ this.options = new ScannerOptions(options);
+ this.instance = instance;
+ this.range = range;
+
+ if (this.options.fetchedColumns.size() > 0) {
+ this.range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
+ }
+
+ this.tableId = table.toString();
+ this.authorizations = authorizations;
+ this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
+
+ try {
+ conn = instance.getConnector(credentials.getPrincipal(), CredentialHelper.extractToken(credentials));
++ config = new ConfigurationCopy(conn.instanceOperations().getSiteConfiguration());
+ nextTablet();
+
+ while (iter != null && !iter.hasTop())
+ nextTablet();
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter != null && iter.hasTop();
+ }
+
+ @Override
+ public Entry<Key,Value> next() {
+ try {
+ byte[] v = iter.getTopValue().get();
+ // copy just like tablet server does, do this before calling next
+ KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length));
+
+ iter.next();
+
+ while (iter != null && !iter.hasTop())
+ nextTablet();
+
+ return ret;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @throws TableNotFoundException
+ * @throws IOException
+ * @throws AccumuloException
+ *
+ */
+ private void nextTablet() throws TableNotFoundException, AccumuloException, IOException {
+
+ Range nextRange = null;
+
+ if (currentExtent == null) {
+ Text startRow;
+
+ if (range.getStartKey() != null)
+ startRow = range.getStartKey().getRow();
+ else
+ startRow = new Text();
+
+ nextRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
+ } else {
+
+ if (currentExtent.getEndRow() == null) {
+ iter = null;
+ return;
+ }
+
+ if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW))) {
+ iter = null;
+ return;
+ }
+
+ nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false);
+ }
+
+ List<String> relFiles = new ArrayList<String>();
+
+ Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
+
+ while (eloc.getSecond() != null) {
+ if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+ Tables.clearCache(instance);
+ if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+ throw new AccumuloException("Table is online " + tableId + " cannot scan tablet in offline mode " + eloc.getFirst());
+ }
+ }
+
+ UtilWaitThread.sleep(250);
+
+ eloc = getTabletFiles(nextRange, relFiles);
+ }
+
+ KeyExtent extent = eloc.getFirst();
+
+ if (!extent.getTableId().toString().equals(tableId)) {
+ throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent);
+ }
+
+ if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
+ throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
-
- String tablesDir = Constants.getTablesDir(instance.getConfiguration());
++
++ String tablesDir = Constants.getTablesDir(config);
+ List<String> absFiles = new ArrayList<String>();
+ for (String relPath : relFiles) {
+ if (relPath.startsWith(".."))
+ absFiles.add(tablesDir + relPath.substring(2));
+ else
+ absFiles.add(tablesDir + "/" + tableId + relPath);
+ }
+
+ iter = createIterator(extent, absFiles);
+ iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns), options.fetchedColumns.size() == 0 ? false : true);
+ currentExtent = extent;
+
+ }
+
+ private Pair<KeyExtent,String> getTabletFiles(Range nextRange, List<String> relFiles) throws TableNotFoundException {
+ Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+ scanner.setBatchSize(100);
+ scanner.setRange(nextRange);
+
+ RowIterator rowIter = new RowIterator(scanner);
+ Iterator<Entry<Key,Value>> row = rowIter.next();
+
+ KeyExtent extent = null;
+ String location = null;
+
+ while (row.hasNext()) {
+ Entry<Key,Value> entry = row.next();
+ Key key = entry.getKey();
+
+ if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
+ relFiles.add(key.getColumnQualifier().toString());
+ }
+
+ if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
+ || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) {
+ location = entry.getValue().toString();
+ }
+
+ if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
+ extent = new KeyExtent(key.getRow(), entry.getValue());
+ }
+
+ }
+ return new Pair<KeyExtent,String>(extent, location);
+ }
+
+ /**
+ * @param absFiles
+ * @return
+ * @throws AccumuloException
+ * @throws TableNotFoundException
+ * @throws IOException
+ */
+ private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<String> absFiles) throws TableNotFoundException, AccumuloException,
+ IOException {
+
+ // TODO share code w/ tablet - ACCUMULO-1303
+ AccumuloConfiguration acuTableConf = AccumuloConfiguration.getTableConfiguration(conn, tableId);
+
+ Configuration conf = CachedConfiguration.getInstance();
-
- FileSystem fs = FileUtil.getFileSystem(conf, instance.getConfiguration());
-
++
++ FileSystem fs = FileUtil.getFileSystem(conf, config);
++
+ for (SortedKeyValueIterator<Key,Value> reader : readers) {
+ ((FileSKVIterator) reader).close();
+ }
+
+ readers.clear();
+
+ // TODO need to close files - ACCUMULO-1303
+ for (String file : absFiles) {
+ FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null);
+ readers.add(reader);
+ }
+
+ MultiIterator multiIter = new MultiIterator(readers, extent);
+
+ OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment();
+
+ DeletingIterator delIter = new DeletingIterator(multiIter, false);
+
+ ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+
+ ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns));
+
+ byte[] defaultSecurityLabel;
+
+ ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
+ defaultSecurityLabel = cv.getExpression();
+
+ VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel);
+
+ return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList,
+ options.serverSideIteratorOptions, iterEnv, false));
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+}
+
+/**
+ *
+ */
+public class OfflineScanner extends ScannerOptions implements Scanner {
+
+ private int batchSize;
+ private int timeOut;
+ private Range range;
+
+ private Instance instance;
+ private TCredentials credentials;
+ private Authorizations authorizations;
+ private Text tableId;
+
+ public OfflineScanner(Instance instance, TCredentials credentials, String tableId, Authorizations authorizations) {
+ ArgumentChecker.notNull(instance, credentials, tableId, authorizations);
+ this.instance = instance;
+ this.credentials = credentials;
+ this.tableId = new Text(tableId);
+ this.range = new Range((Key) null, (Key) null);
+
+ this.authorizations = authorizations;
+
+ this.batchSize = Constants.SCAN_BATCH_SIZE;
+ this.timeOut = Integer.MAX_VALUE;
+ }
+
+ @Deprecated
+ @Override
+ public void setTimeOut(int timeOut) {
+ this.timeOut = timeOut;
+ }
+
+ @Deprecated
+ @Override
+ public int getTimeOut() {
+ return timeOut;
+ }
+
+ @Override
+ public void setRange(Range range) {
+ this.range = range;
+ }
+
+ @Override
+ public Range getRange() {
+ return range;
+ }
+
+ @Override
+ public void setBatchSize(int size) {
+ this.batchSize = size;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public void enableIsolation() {
+
+ }
+
+ @Override
+ public void disableIsolation() {
+
+ }
+
+ @Override
+ public Iterator<Entry<Key,Value>> iterator() {
+ return new OfflineIterator(this, instance, credentials, authorizations, tableId, range);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/23bb4321/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
index f7b2263,0000000..75f140b
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
@@@ -1,442 -1,0 +1,433 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
- import org.apache.accumulo.core.conf.AccumuloConfiguration;
- import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.CredentialHelper;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.log4j.Level;
+
+/**
+ * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
+ */
+public class RangeInputSplit extends InputSplit implements Writable {
+ private Range range;
+ private String[] locations;
+ private String table, instanceName, zooKeepers, principal;
+ private AuthenticationToken token;
+ private Boolean offline, mockInstance, isolatedScan, localIterators;
+ private Authorizations auths;
+ private Set<Pair<Text,Text>> fetchedColumns;
+ private List<IteratorSetting> iterators;
+ private Level level;
+
+ public RangeInputSplit() {
+ range = new Range();
+ locations = new String[0];
+ }
+
+ public RangeInputSplit(Range range, String[] locations) {
+ this.range = range;
+ this.locations = locations;
+ }
+
+ public Range getRange() {
+ return range;
+ }
+
+ private static byte[] extractBytes(ByteSequence seq, int numBytes) {
+ byte[] bytes = new byte[numBytes + 1];
+ bytes[0] = 0;
+ for (int i = 0; i < numBytes; i++) {
+ if (i >= seq.length())
+ bytes[i + 1] = 0;
+ else
+ bytes[i + 1] = seq.byteAt(i);
+ }
+ return bytes;
+ }
+
+ public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
+ int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
+ BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
+ BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
+ BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
+ return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
+ }
+
+ public float getProgress(Key currentKey) {
+ if (currentKey == null)
+ return 0f;
+ if (range.getStartKey() != null && range.getEndKey() != null) {
+ if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) {
+ // just look at the row progress
+ return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData());
+ } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) {
+ // just look at the column family progress
+ return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
+ } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) {
+ // just look at the column qualifier progress
+ return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
+ }
+ }
+ // if we can't figure it out, then claim no progress
+ return 0f;
+ }
+
+ /**
+ * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
+ */
+ @Override
+ public long getLength() throws IOException {
+ Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow();
+ Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
+ int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
+ long diff = 0;
+
+ byte[] start = startRow.getBytes();
+ byte[] stop = stopRow.getBytes();
+ for (int i = 0; i < maxCommon; ++i) {
+ diff |= 0xff & (start[i] ^ stop[i]);
+ diff <<= Byte.SIZE;
+ }
+
+ if (startRow.getLength() != stopRow.getLength())
+ diff |= 0xff;
+
+ return diff + 1;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return locations;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ range.readFields(in);
+ int numLocs = in.readInt();
+ locations = new String[numLocs];
+ for (int i = 0; i < numLocs; ++i)
+ locations[i] = in.readUTF();
+
+ if (in.readBoolean()) {
+ isolatedScan = in.readBoolean();
+ }
+
+ if (in.readBoolean()) {
+ offline = in.readBoolean();
+ }
+
+ if (in.readBoolean()) {
+ localIterators = in.readBoolean();
+ }
+
+ if (in.readBoolean()) {
+ mockInstance = in.readBoolean();
+ }
+
+ if (in.readBoolean()) {
+ int numColumns = in.readInt();
+ List<String> columns = new ArrayList<String>(numColumns);
+ for (int i = 0; i < numColumns; i++) {
+ columns.add(in.readUTF());
+ }
+
+ fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns);
+ }
+
+ if (in.readBoolean()) {
+ String strAuths = in.readUTF();
+ auths = new Authorizations(strAuths.getBytes(Charset.forName("UTF-8")));
+ }
+
+ if (in.readBoolean()) {
+ principal = in.readUTF();
+ }
+
+ if (in.readBoolean()) {
+ String tokenClass = in.readUTF();
+ byte[] base64TokenBytes = in.readUTF().getBytes(Charset.forName("UTF-8"));
+ byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes);
+
+ try {
+ token = CredentialHelper.extractToken(tokenClass, tokenBytes);
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (in.readBoolean()) {
+ instanceName = in.readUTF();
+ }
+
+ if (in.readBoolean()) {
+ zooKeepers = in.readUTF();
+ }
+
+ if (in.readBoolean()) {
+ level = Level.toLevel(in.readInt());
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ range.write(out);
+ out.writeInt(locations.length);
+ for (int i = 0; i < locations.length; ++i)
+ out.writeUTF(locations[i]);
+
+ out.writeBoolean(null != isolatedScan);
+ if (null != isolatedScan) {
+ out.writeBoolean(isolatedScan);
+ }
+
+ out.writeBoolean(null != offline);
+ if (null != offline) {
+ out.writeBoolean(offline);
+ }
+
+ out.writeBoolean(null != localIterators);
+ if (null != localIterators) {
+ out.writeBoolean(localIterators);
+ }
+
+ out.writeBoolean(null != mockInstance);
+ if (null != mockInstance) {
+ out.writeBoolean(mockInstance);
+ }
+
+ out.writeBoolean(null != fetchedColumns);
+ if (null != fetchedColumns) {
+ String[] cols = InputConfigurator.serializeColumns(fetchedColumns);
+ out.writeInt(cols.length);
+ for (String col : cols) {
+ out.writeUTF(col);
+ }
+ }
+
+ out.writeBoolean(null != auths);
+ if (null != auths) {
+ out.writeUTF(auths.serialize());
+ }
+
+ out.writeBoolean(null != principal);
+ if (null != principal) {
+ out.writeUTF(principal);
+ }
+
+ out.writeBoolean(null != token);
+ if (null != token) {
+ out.writeUTF(token.getClass().getCanonicalName());
+ try {
+ out.writeUTF(CredentialHelper.tokenAsBase64(token));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+ }
+
+ out.writeBoolean(null != instanceName);
+ if (null != instanceName) {
+ out.writeUTF(instanceName);
+ }
+
+ out.writeBoolean(null != zooKeepers);
+ if (null != zooKeepers) {
+ out.writeUTF(zooKeepers);
+ }
+
+ out.writeBoolean(null != level);
+ if (null != level) {
+ out.writeInt(level.toInt());
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(256);
+ sb.append("Range: ").append(range);
+ sb.append(" Locations: ").append(Arrays.asList(locations));
+ sb.append(" Table: ").append(table);
+ sb.append(" InstanceName: ").append(instanceName);
+ sb.append(" zooKeepers: ").append(zooKeepers);
+ sb.append(" principal: ").append(principal);
+ sb.append(" authenticationToken: ").append(token);
+ sb.append(" Authorizations: ").append(auths);
+ sb.append(" offlineScan: ").append(offline);
+ sb.append(" mockInstance: ").append(mockInstance);
+ sb.append(" isolatedScan: ").append(isolatedScan);
+ sb.append(" localIterators: ").append(localIterators);
+ sb.append(" fetchColumns: ").append(fetchedColumns);
+ sb.append(" iterators: ").append(iterators);
+ sb.append(" logLevel: ").append(level);
+ return sb.toString();
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public Instance getInstance() {
+ if (null == instanceName) {
+ return null;
+ }
+
+ if (isMockInstance()) {
+ return new MockInstance(getInstanceName());
+ }
+
+ if (null == zooKeepers) {
+ return null;
+ }
-
- ZooKeeperInstance zki = new ZooKeeperInstance(getInstanceName(), getZooKeepers());
-
- // Wrap the DefaultConfiguration with a SiteConfiguration so we use accumulo-site.xml
- // when it's present
- AccumuloConfiguration xmlConfig = SiteConfiguration.getInstance(zki.getConfiguration());
- zki.setConfiguration(xmlConfig);
-
- return zki;
++
++ return new ZooKeeperInstance(getInstanceName(), getZooKeepers());
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public void setInstanceName(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ public String getZooKeepers() {
+ return zooKeepers;
+ }
+
+ public void setZooKeepers(String zooKeepers) {
+ this.zooKeepers = zooKeepers;
+ }
+
+ public String getPrincipal() {
+ return principal;
+ }
+
+ public void setPrincipal(String principal) {
+ this.principal = principal;
+ }
+
+ public AuthenticationToken getToken() {
+ return token;
+ }
+
+ public void setToken(AuthenticationToken token) {
+ this.token = token;
+ ;
+ }
+
+ public Boolean isOffline() {
+ return offline;
+ }
+
+ public void setOffline(Boolean offline) {
+ this.offline = offline;
+ }
+
+ public void setLocations(String[] locations) {
+ this.locations = locations;
+ }
+
+ public Boolean isMockInstance() {
+ return mockInstance;
+ }
+
+ public void setMockInstance(Boolean mockInstance) {
+ this.mockInstance = mockInstance;
+ }
+
+ public Boolean isIsolatedScan() {
+ return isolatedScan;
+ }
+
+ public void setIsolatedScan(Boolean isolatedScan) {
+ this.isolatedScan = isolatedScan;
+ }
+
+ public Authorizations getAuths() {
+ return auths;
+ }
+
+ public void setAuths(Authorizations auths) {
+ this.auths = auths;
+ }
+
+ public void setRange(Range range) {
+ this.range = range;
+ }
+
+ public Boolean usesLocalIterators() {
+ return localIterators;
+ }
+
+ public void setUsesLocalIterators(Boolean localIterators) {
+ this.localIterators = localIterators;
+ }
+
+ public Set<Pair<Text,Text>> getFetchedColumns() {
+ return fetchedColumns;
+ }
+
+ public void setFetchedColumns(Set<Pair<Text,Text>> fetchedColumns) {
+ this.fetchedColumns = fetchedColumns;
+ }
+
+ public List<IteratorSetting> getIterators() {
+ return iterators;
+ }
+
+ public void setIterators(List<IteratorSetting> iterators) {
+ this.iterators = iterators;
+ }
+
+ public Level getLogLevel() {
+ return level;
+ }
+
+ public void setLogLevel(Level level) {
+ this.level = level;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/23bb4321/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
index a38aecf,0000000..b1ae3a5
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
@@@ -1,281 -1,0 +1,275 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce.lib.util;
+
+import java.nio.charset.Charset;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.security.CredentialHelper;
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * @since 1.5.0
+ */
+public class ConfiguratorBase {
+
+ /**
+ * Configuration keys for {@link Instance#getConnector(String, AuthenticationToken)}.
+ *
+ * @since 1.5.0
+ */
+ public static enum ConnectorInfo {
+ IS_CONFIGURED, PRINCIPAL, TOKEN, TOKEN_CLASS
+ }
+
+ /**
+ * Configuration keys for {@link Instance}, {@link ZooKeeperInstance}, and {@link MockInstance}.
+ *
+ * @since 1.5.0
+ */
+ protected static enum InstanceOpts {
+ TYPE, NAME, ZOO_KEEPERS;
+ }
+
+ /**
+ * Configuration keys for general configuration options.
+ *
+ * @since 1.5.0
+ */
+ protected static enum GeneralOpts {
+ LOG_LEVEL
+ }
+
+ /**
+ * Provides a configuration key for a given feature enum, prefixed by the implementingClass
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param e
+ * the enum used to provide the unique part of the configuration key
+ * @return the configuration key
+ * @since 1.5.0
+ */
+ protected static String enumToConfKey(Class<?> implementingClass, Enum<?> e) {
+ return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName() + "." + StringUtils.camelize(e.name().toLowerCase());
+ }
+
+ /**
+ * Sets the connector information needed to communicate with Accumulo in this job.
+ *
+ * <p>
+ * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
+ * conversion to a string, and is not intended to be secure.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @param principal
+ * a valid Accumulo user name
+ * @param token
+ * the user's password
+ * @throws AccumuloSecurityException
+ * @since 1.5.0
+ */
+ public static void setConnectorInfo(Class<?> implementingClass, Configuration conf, String principal, AuthenticationToken token)
+ throws AccumuloSecurityException {
+ if (isConnectorInfoSet(implementingClass, conf))
+ throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job");
+
+ ArgumentChecker.notNull(principal, token);
+ conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
+ conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal);
+ conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_CLASS), token.getClass().getCanonicalName());
+ conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), CredentialHelper.tokenAsBase64(token));
+ }
+
+ /**
+ * Determines if the connector info has already been set for this instance.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @return true if the connector info has already been set, false otherwise
+ * @since 1.5.0
+ * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
+ */
+ public static Boolean isConnectorInfoSet(Class<?> implementingClass, Configuration conf) {
+ return conf.getBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), false);
+ }
+
+ /**
+ * Gets the user name from the configuration.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @return the principal
+ * @since 1.5.0
+ * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
+ */
+ public static String getPrincipal(Class<?> implementingClass, Configuration conf) {
+ return conf.get(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL));
+ }
+
+ /**
+ * Gets the serialized token class from the configuration.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @return the principal
+ * @since 1.5.0
+ * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
+ */
+ public static String getTokenClass(Class<?> implementingClass, Configuration conf) {
+ return conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_CLASS));
+ }
+
+ /**
+ * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
+ * provide a charset safe conversion to a string, and is not intended to be secure.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @return the decoded principal's authentication token
+ * @since 1.5.0
+ * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
+ */
+ public static byte[] getToken(Class<?> implementingClass, Configuration conf) {
+ return Base64.decodeBase64(conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), "").getBytes(Charset.forName("UTF-8")));
+ }
+
+ /**
+ * Configures a {@link ZooKeeperInstance} for this job.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @param instanceName
+ * the Accumulo instance name
+ * @param zooKeepers
+ * a comma-separated list of zookeeper servers
+ * @since 1.5.0
+ */
+ public static void setZooKeeperInstance(Class<?> implementingClass, Configuration conf, String instanceName, String zooKeepers) {
+ String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
+ if (!conf.get(key, "").isEmpty())
+ throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key));
+ conf.set(key, "ZooKeeperInstance");
+
+ ArgumentChecker.notNull(instanceName, zooKeepers);
+ conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName);
+ conf.set(enumToConfKey(implementingClass, InstanceOpts.ZOO_KEEPERS), zooKeepers);
+ }
+
+ /**
+ * Configures a {@link MockInstance} for this job.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @param instanceName
+ * the Accumulo instance name
+ * @since 1.5.0
+ */
+ public static void setMockInstance(Class<?> implementingClass, Configuration conf, String instanceName) {
+ String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
+ if (!conf.get(key, "").isEmpty())
+ throw new IllegalStateException("Instance info can only be set once per job; it has already been configured with " + conf.get(key));
+ conf.set(key, "MockInstance");
+
+ ArgumentChecker.notNull(instanceName);
+ conf.set(enumToConfKey(implementingClass, InstanceOpts.NAME), instanceName);
+ }
+
+ /**
+ * Initializes an Accumulo {@link Instance} based on the configuration.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @return an Accumulo instance
+ * @since 1.5.0
+ * @see #setZooKeeperInstance(Class, Configuration, String, String)
+ * @see #setMockInstance(Class, Configuration, String)
+ */
+ public static Instance getInstance(Class<?> implementingClass, Configuration conf) {
+ String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE), "");
+ if ("MockInstance".equals(instanceType))
+ return new MockInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)));
+ else if ("ZooKeeperInstance".equals(instanceType)) {
- ZooKeeperInstance zki = new ZooKeeperInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)), conf.get(enumToConfKey(implementingClass,
++ return new ZooKeeperInstance(conf.get(enumToConfKey(implementingClass, InstanceOpts.NAME)), conf.get(enumToConfKey(implementingClass,
+ InstanceOpts.ZOO_KEEPERS)));
-
- // Wrap the DefaultConfiguration with a SiteConfiguration
- AccumuloConfiguration xmlConfig = SiteConfiguration.getInstance(zki.getConfiguration());
- zki.setConfiguration(xmlConfig);
-
- return zki;
+ } else if (instanceType.isEmpty())
+ throw new IllegalStateException("Instance has not been configured for " + implementingClass.getSimpleName());
+ else
+ throw new IllegalStateException("Unrecognized instance type " + instanceType);
+ }
+
+ /**
+ * Sets the log level for this job.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @param level
+ * the logging level
+ * @since 1.5.0
+ */
+ public static void setLogLevel(Class<?> implementingClass, Configuration conf, Level level) {
+ ArgumentChecker.notNull(level);
+ Logger.getLogger(implementingClass).setLevel(level);
+ conf.setInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), level.toInt());
+ }
+
+ /**
+ * Gets the log level from this configuration.
+ *
+ * @param implementingClass
+ * the class whose name will be used as a prefix for the property configuration key
+ * @param conf
+ * the Hadoop configuration object to configure
+ * @return the log level
+ * @since 1.5.0
+ * @see #setLogLevel(Class, Configuration, Level)
+ */
+ public static Level getLogLevel(Class<?> implementingClass, Configuration conf) {
+ return Level.toLevel(conf.getInt(enumToConfKey(implementingClass, GeneralOpts.LOG_LEVEL), Level.INFO.toInt()));
+ }
+
+}