You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:58:28 UTC
[57/64] [abbrv] git commit: Merge branch '1.5.2-SNAPSHOT' into
1.6.0-SNAPSHOT
Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
core/src/main/java/org/apache/accumulo/core/Constants.java
core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
core/src/main/java/org/apache/accumulo/core/data/ColumnUpdate.java
core/src/main/java/org/apache/accumulo/core/data/KeyExtent.java
core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFile.java
core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java
core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java
core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
core/src/test/java/org/apache/accumulo/core/util/shell/command/FormatterCommandTest.java
examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Query.java
minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
server/base/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/716ea0ee
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/716ea0ee
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/716ea0ee
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 716ea0ee8b26bf504d0cf9e90fc1d3d8579bc50a
Parents: 3934ea6 9261338
Author: Christopher Tubbs <ct...@apache.org>
Authored: Wed Apr 9 13:36:22 2014 -0400
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Wed Apr 9 13:36:22 2014 -0400
----------------------------------------------------------------------
.../core/client/ClientSideIteratorScanner.java | 2 -
.../accumulo/core/client/ConditionalWriter.java | 2 -
.../core/client/ConditionalWriterConfig.java | 2 -
.../apache/accumulo/core/client/Connector.java | 2 -
.../accumulo/core/client/IteratorSetting.java | 4 -
.../accumulo/core/client/RowIterator.java | 4 -
.../accumulo/core/client/ScannerBase.java | 1 -
.../core/client/admin/ActiveCompaction.java | 1 -
.../core/client/admin/InstanceOperations.java | 12 --
.../core/client/admin/TableOperations.java | 30 ----
.../core/client/admin/TableOperationsImpl.java | 4 -
.../core/client/impl/ThriftTransportPool.java | 16 +-
.../core/client/mapred/AbstractInputFormat.java | 2 -
.../client/mapred/AccumuloOutputFormat.java | 2 -
.../client/mapreduce/AbstractInputFormat.java | 3 +-
.../client/mapreduce/AccumuloOutputFormat.java | 2 -
.../core/client/mapreduce/InputTableConfig.java | 3 -
.../mapreduce/lib/util/ConfiguratorBase.java | 2 -
.../core/client/mock/MockBatchDeleter.java | 4 -
.../apache/accumulo/core/data/Condition.java | 7 -
.../java/org/apache/accumulo/core/data/Key.java | 7 +-
.../org/apache/accumulo/core/data/Range.java | 17 +-
.../accumulo/core/file/rfile/BlockIndex.java | 5 -
.../accumulo/core/file/rfile/bcfile/BCFile.java | 15 +-
.../core/file/rfile/bcfile/ByteArray.java | 2 -
.../accumulo/core/file/rfile/bcfile/Utils.java | 11 --
.../core/iterators/TypedValueCombiner.java | 6 -
.../core/iterators/ValueFormatException.java | 6 -
.../core/iterators/system/MapFileIterator.java | 8 +-
.../core/iterators/user/GrepIterator.java | 3 -
.../iterators/user/IntersectingIterator.java | 10 --
.../accumulo/core/iterators/user/RowFilter.java | 1 -
.../iterators/user/TransformingIterator.java | 164 +++++++++----------
.../core/iterators/user/VersioningIterator.java | 3 -
.../accumulo/core/security/SecurityUtil.java | 1 -
.../security/crypto/CryptoModuleFactory.java | 1 -
.../security/crypto/CryptoModuleParameters.java | 6 -
.../core/client/impl/ScannerOptionsTest.java | 2 -
.../client/lexicoder/ReverseLexicoderTest.java | 2 -
.../client/mapred/AccumuloInputFormatTest.java | 4 -
.../mapreduce/AccumuloInputFormatTest.java | 4 -
.../core/client/mock/MockNamespacesTest.java | 8 -
.../core/security/VisibilityConstraintTest.java | 3 -
.../simple/client/RandomBatchScanner.java | 5 -
.../simple/client/RandomBatchWriter.java | 4 -
.../simple/client/SequentialBatchWriter.java | 5 -
.../simple/client/TraceDumpExample.java | 9 +-
.../examples/simple/dirlist/QueryUtil.java | 3 -
.../examples/simple/mapreduce/NGramIngest.java | 3 -
.../examples/simple/mapreduce/TableToFile.java | 1 -
.../accumulo/examples/simple/shard/Query.java | 3 -
.../minicluster/MiniAccumuloRunner.java | 2 -
.../accumulo/proxy/TestProxyReadWrite.java | 10 --
.../accumulo/server/conf/ConfigSanityCheck.java | 3 -
.../server/master/balancer/TabletBalancer.java | 2 -
.../server/master/state/TabletStateStore.java | 7 -
.../server/metrics/AbstractMetricsImpl.java | 4 -
.../server/security/handler/ZKAuthorizor.java | 4 -
.../server/security/handler/ZKPermHandler.java | 4 -
.../accumulo/server/util/LoginProperties.java | 3 -
.../accumulo/server/util/RestoreZookeeper.java | 4 -
.../accumulo/server/util/TableDiskUsage.java | 3 -
.../accumulo/server/util/TabletServerLocks.java | 3 -
.../org/apache/accumulo/tserver/MemValue.java | 4 +-
.../apache/accumulo/tserver/TabletServer.java | 16 +-
.../tserver/compaction/CompactionStrategy.java | 4 -
.../accumulo/tserver/logger/LogReader.java | 1 -
.../start/classloader/AccumuloClassLoader.java | 3 -
.../start/classloader/vfs/ContextManager.java | 2 -
.../vfs/PostDelegatingVFSClassLoader.java | 7 +-
.../vfs/providers/HdfsFileSystem.java | 5 -
.../accumulo/test/NativeMapPerformanceTest.java | 3 -
.../accumulo/test/NativeMapStressTest.java | 3 -
.../test/continuous/ContinuousMoru.java | 1 -
.../test/continuous/ContinuousVerify.java | 1 -
.../test/functional/CacheTestClean.java | 3 -
.../accumulo/test/randomwalk/Framework.java | 3 -
.../apache/accumulo/test/randomwalk/Node.java | 1 -
.../randomwalk/concurrent/CheckBalance.java | 5 +-
79 files changed, 124 insertions(+), 414 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index 5fdccf0,0000000..95f73bb
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@@ -1,141 -1,0 +1,139 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client;
+
+import java.util.Iterator;
+
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.data.ConditionalMutation;
+
+/**
+ * ConditionalWriter provides the ability to do efficient, atomic read-modify-write operations on rows. These operations are performed on the tablet server
+ * while a row lock is held.
+ *
+ * @since 1.6.0
+ */
+public interface ConditionalWriter {
+ class Result {
+
+ private Status status;
+ private ConditionalMutation mutation;
+ private String server;
+ private Exception exception;
+
+ public Result(Status s, ConditionalMutation m, String server) {
+ this.status = s;
+ this.mutation = m;
+ this.server = server;
+ }
+
+ public Result(Exception e, ConditionalMutation cm, String server) {
+ this.exception = e;
+ this.mutation = cm;
+ this.server = server;
+ }
+
+ /**
+ * If this method throws an exception, then its possible the mutation is still being actively processed. Therefore if code chooses to continue after seeing
+ * an exception it should take this into consideration.
+ *
+ * @return status of a conditional mutation
- * @throws AccumuloException
- * @throws AccumuloSecurityException
+ */
+
+ public Status getStatus() throws AccumuloException, AccumuloSecurityException {
+ if (status == null) {
+ if (exception instanceof AccumuloException)
+ throw new AccumuloException(exception);
+ if (exception instanceof AccumuloSecurityException) {
+ AccumuloSecurityException ase = (AccumuloSecurityException) exception;
+ throw new AccumuloSecurityException(ase.getUser(), SecurityErrorCode.valueOf(ase.getSecurityErrorCode().name()), ase.getTableInfo(), ase);
+ }
+ else
+ throw new AccumuloException(exception);
+ }
+
+ return status;
+ }
+
+ /**
+ *
+ * @return A copy of the mutation previously submitted by a user. The mutation will reference the same data, but the object may be different.
+ */
+ public ConditionalMutation getMutation() {
+ return mutation;
+ }
+
+ /**
+ *
+ * @return The server this mutation was sent to. Returns null if was not sent to a server.
+ */
+ public String getTabletServer() {
+ return server;
+ }
+ }
+
+ public static enum Status {
+ /**
+ * conditions were met and mutation was written
+ */
+ ACCEPTED,
+ /**
+ * conditions were not met and mutation was not written
+ */
+ REJECTED,
+ /**
+ * mutation violated a constraint and was not written
+ */
+ VIOLATED,
+ /**
+ * error occurred after mutation was sent to server, its unknown if the mutation was written. Although the status of the mutation is unknown, Accumulo
+ * guarantees the mutation will not be written at a later point in time.
+ */
+ UNKNOWN,
+ /**
+ * A condition contained a column visibility that could never be seen
+ */
+ INVISIBLE_VISIBILITY,
+
+ }
+
+ /**
+ * This method returns one result for each mutation passed to it. This method is thread safe. Multiple threads can safely use a single conditional writer.
+ * Sharing a conditional writer between multiple threads may result in batching of request to tablet servers.
+ *
+ * @param mutations
+ * @return Result for each mutation submitted. The mutations may still be processing in the background when this method returns, if so the iterator will
+ * block.
+ */
+ Iterator<Result> write(Iterator<ConditionalMutation> mutations);
+
+ /**
+ * This method has the same thread safety guarantees as @link {@link #write(Iterator)}
+ *
+ *
+ * @param mutation
+ * @return Result for the submitted mutation
+ */
+
+ Result write(ConditionalMutation mutation);
+
+ /**
+ * release any resources (like threads pools) used by conditional writer
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
index f2a91ea,0000000..a220e62
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
@@@ -1,118 -1,0 +1,116 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ArgumentChecker;
+
+/**
+ *
+ * @since 1.6.0
+ */
+public class ConditionalWriterConfig {
+
+ private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE;
+ private Long timeout = null;
+
+ private static final Integer DEFAULT_MAX_WRITE_THREADS = 3;
+ private Integer maxWriteThreads = null;
+
+ private Authorizations auths = Authorizations.EMPTY;
+
+ /**
+ * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in must be
+ * a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are passed, then an
+ * exception will be thrown.
+ *
+ * <p>
+ * Any condition that is not visible with this set of authorizations will fail.
- *
- * @param auths
+ */
+ public ConditionalWriterConfig setAuthorizations(Authorizations auths) {
+ ArgumentChecker.notNull(auths);
+ this.auths = auths;
+ return this;
+ }
+
+ /**
+ * Sets the maximum amount of time an unresponsive server will be re-tried. When this timeout is exceeded, the {@link ConditionalWriter} should return the
+ * mutation with an exception.<br />
+ * For no timeout, set to zero, or {@link Long#MAX_VALUE} with {@link TimeUnit#MILLISECONDS}.
+ *
+ * <p>
+ * {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be truncated to the nearest {@link TimeUnit#MILLISECONDS}.<br />
+ * If this truncation would result in making the value zero when it was specified as non-zero, then a minimum value of one {@link TimeUnit#MILLISECONDS} will
+ * be used.
+ *
+ * <p>
+ * <b>Default:</b> {@link Long#MAX_VALUE} (no timeout)
+ *
+ * @param timeout
+ * the timeout, in the unit specified by the value of {@code timeUnit}
+ * @param timeUnit
+ * determines how {@code timeout} will be interpreted
+ * @throws IllegalArgumentException
+ * if {@code timeout} is less than 0
+ * @return {@code this} to allow chaining of set methods
+ */
+ public ConditionalWriterConfig setTimeout(long timeout, TimeUnit timeUnit) {
+ if (timeout < 0)
+ throw new IllegalArgumentException("Negative timeout not allowed " + timeout);
+
+ if (timeout == 0)
+ this.timeout = Long.MAX_VALUE;
+ else
+ // make small, positive values that truncate to 0 when converted use the minimum millis instead
+ this.timeout = Math.max(1, timeUnit.toMillis(timeout));
+ return this;
+ }
+
+ /**
+ * Sets the maximum number of threads to use for writing data to the tablet servers.
+ *
+ * <p>
+ * <b>Default:</b> 3
+ *
+ * @param maxWriteThreads
+ * the maximum threads to use
+ * @throws IllegalArgumentException
+ * if {@code maxWriteThreads} is non-positive
+ * @return {@code this} to allow chaining of set methods
+ */
+ public ConditionalWriterConfig setMaxWriteThreads(int maxWriteThreads) {
+ if (maxWriteThreads <= 0)
+ throw new IllegalArgumentException("Max threads must be positive " + maxWriteThreads);
+
+ this.maxWriteThreads = maxWriteThreads;
+ return this;
+ }
+
+ public Authorizations getAuthorizations() {
+ return auths;
+ }
+
+ public long getTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ public int getMaxWriteThreads() {
+ return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/Connector.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/Connector.java
index 92a1184,3189d44..4a2acff
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@@ -88,13 -87,12 +88,12 @@@ public abstract class Connector
* @param config
* configuration used to create batch writer
* @return BatchDeleter object for configuring and deleting
- * @throws TableNotFoundException
* @since 1.5.0
*/
-
+
public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, BatchWriterConfig config)
throws TableNotFoundException;
-
+
/**
* Factory method to create a BatchWriter connected to Accumulo.
*
@@@ -123,12 -121,11 +122,11 @@@
* @param config
* configuration used to create batch writer
* @return BatchWriter object for configuring and writing data to
- * @throws TableNotFoundException
* @since 1.5.0
*/
-
+
public abstract BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException;
-
+
/**
* Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables, which is good for
* ingesting data into multiple tables from the same source
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/IteratorSetting.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/IteratorSetting.java
index 7a98df2,e58a1be..e69f3dd
--- a/core/src/main/java/org/apache/accumulo/core/client/IteratorSetting.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/IteratorSetting.java
@@@ -82,11 -82,9 +82,9 @@@ public class IteratorSetting implement
public String getName() {
return name;
}
-
+
/**
* Set the iterator's name. Must be a simple alphanumeric identifier.
- *
- * @param name
*/
public void setName(String name) {
ArgumentChecker.notNull(name);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
index afa539a,29ff2a6..7b58bd4
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
@@@ -58,21 -58,17 +58,17 @@@ public interface InstanceOperations
*
* @return A map of system properties set in zookeeper. If a property is not set in zookeeper, then it will return the value set in accumulo-site.xml on some
* server. If nothing is set in an accumulo-site.xml file it will return the default value for each property.
- * @throws AccumuloException
- * @throws AccumuloSecurityException
*/
- public Map<String,String> getSystemConfiguration() throws AccumuloException, AccumuloSecurityException;
+ Map<String,String> getSystemConfiguration() throws AccumuloException, AccumuloSecurityException;
/**
*
* @return A map of system properties set in accumulo-site.xml on some server. If nothing is set in an accumulo-site.xml file it will return the default value
* for each property.
- * @throws AccumuloException
- * @throws AccumuloSecurityException
*/
- public Map<String,String> getSiteConfiguration() throws AccumuloException, AccumuloSecurityException;
+ Map<String,String> getSiteConfiguration() throws AccumuloException, AccumuloSecurityException;
/**
* List the currently active tablet servers participating in the accumulo instance
@@@ -88,11 -84,9 +84,9 @@@
* @param tserver
* The tablet server address should be of the form <ip address>:<port>
* @return A list of active scans on tablet server.
- * @throws AccumuloException
- * @throws AccumuloSecurityException
*/
- public List<ActiveScan> getActiveScans(String tserver) throws AccumuloException, AccumuloSecurityException;
+ List<ActiveScan> getActiveScans(String tserver) throws AccumuloException, AccumuloSecurityException;
/**
* List the active compaction running on a tablet server
@@@ -112,20 -104,16 +104,16 @@@
*
* @param tserver
* The tablet server address should be of the form <ip address>:<port>
- * @throws AccumuloException
* @since 1.5.0
*/
- public void ping(String tserver) throws AccumuloException;
+ void ping(String tserver) throws AccumuloException;
/**
* Test to see if the instance can load the given class as the given type. This check does not consider per table classpaths, see
* {@link TableOperations#testClassLoad(String, String, String)}
*
- * @param className
- * @param asTypeName
* @return true if the instance can load the given class as the given type, false otherwise
- * @throws AccumuloException
*/
- public boolean testClassLoad(final String className, final String asTypeName) throws AccumuloException, AccumuloSecurityException;
+ boolean testClassLoad(final String className, final String asTypeName) throws AccumuloException, AccumuloSecurityException;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index 6166673,0823656..d9919ef
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@@ -109,13 -109,10 +109,10 @@@ public interface TableOperations
* Name of a table to create and import into.
* @param importDir
* Directory that contains the files copied by distcp from exportTable
- * @throws TableExistsException
- * @throws AccumuloException
- * @throws AccumuloSecurityException
* @since 1.5.0
*/
- public void importTable(String tableName, String importDir) throws TableExistsException, AccumuloException, AccumuloSecurityException;
-
+ void importTable(String tableName, String importDir) throws TableExistsException, AccumuloException, AccumuloSecurityException;
+
/**
* Exports a table. The tables data is not exported, just table metadata and a list of files to distcp. The table being exported must be offline and stay
* offline for the duration of distcp. To avoid losing access to a table it can be cloned and the clone taken offline for export.
@@@ -127,12 -124,9 +124,9 @@@
* Name of the table to export.
* @param exportDir
* An empty directory in HDFS where files containing table metadata and list of files to distcp will be placed.
- * @throws TableNotFoundException
- * @throws AccumuloException
- * @throws AccumuloSecurityException
* @since 1.5.0
*/
- public void exportTable(String tableName, String exportDir) throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
+ void exportTable(String tableName, String exportDir) throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
/**
* Ensures that tablets are split along a set of keys.
@@@ -212,11 -205,10 +205,10 @@@
* @throws AccumuloSecurityException
* if the user does not have permission
* @return the split points (end-row names) for the table's current split profile, grouped into fewer splits so as not to exceed maxSplits
- * @throws TableNotFoundException
* @since 1.5.0
*/
- public Collection<Text> listSplits(String tableName, int maxSplits) throws TableNotFoundException, AccumuloSecurityException, AccumuloException;
-
+ Collection<Text> listSplits(String tableName, int maxSplits) throws TableNotFoundException, AccumuloSecurityException, AccumuloException;
+
/**
* Finds the max row within a given range. To find the max row in a table, pass null for start and end row.
*
@@@ -233,14 -224,10 +224,10 @@@
* determines if the end row is included
*
* @return The max row in the range, or null if there is no visible data in the range.
- *
- * @throws AccumuloSecurityException
- * @throws AccumuloException
- * @throws TableNotFoundException
*/
- public Text getMaxRow(String tableName, Authorizations auths, Text startRow, boolean startInclusive, Text endRow, boolean endInclusive)
+ Text getMaxRow(String tableName, Authorizations auths, Text startRow, boolean startInclusive, Text endRow, boolean endInclusive)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
-
+
/**
* Merge tablets between (start, end]
*
@@@ -401,10 -388,9 +388,9 @@@
* if a general error occurs
* @throws AccumuloSecurityException
* if the user does not have permission
- * @throws TableNotFoundException
*/
- public void flush(String tableName, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
-
+ void flush(String tableName, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
+
/**
* Sets a property on a table. Note that it may take a short period of time (a second) to propagate the change everywhere.
*
@@@ -527,10 -513,9 +513,9 @@@
* when there is a general accumulo error
* @throws AccumuloSecurityException
* when the user does not have the proper permissions
- * @throws TableNotFoundException
*/
- public void offline(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException;
-
+ void offline(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException;
+
/**
*
* @param tableName
@@@ -555,25 -524,9 +540,24 @@@
* when there is a general accumulo error
* @throws AccumuloSecurityException
* when the user does not have the proper permissions
- * @throws TableNotFoundException
*/
- public void online(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException;
-
+ void online(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException;
+
+ /**
+ *
+ * @param tableName
+ * the table to take online
+ * @param wait
+ * if true, then will not return until table is online
+ * @throws AccumuloException
+ * when there is a general accumulo error
+ * @throws AccumuloSecurityException
+ * when the user does not have the proper permissions
+ * @throws TableNotFoundException
+ * @since 1.6.0
+ */
+ void online(String tableName, boolean wait) throws AccumuloSecurityException, AccumuloException, TableNotFoundException;
+
/**
* Clears the tablet locator cache for a specified table
*
@@@ -669,12 -618,9 +649,9 @@@
* @param tableName
* the name of the table
* @return a set of iterator names
- * @throws AccumuloSecurityException
- * @throws AccumuloException
- * @throws TableNotFoundException
*/
- public Map<String,EnumSet<IteratorScope>> listIterators(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException;
-
+ Map<String,EnumSet<IteratorScope>> listIterators(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException;
+
/**
* Check whether a given iterator configuration conflicts with existing configuration; in particular, determine if the name or priority are already in use for
* the specified scopes.
@@@ -700,11 -646,10 +677,10 @@@
* thrown if the constraint has already been added to the table or if there are errors in the configuration of existing constraints
* @throws AccumuloSecurityException
* thrown if the user doesn't have permission to add the constraint
- * @throws TableNotFoundException
* @since 1.5.0
*/
- public int addConstraint(String tableName, String constraintClassName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
-
+ int addConstraint(String tableName, String constraintClassName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
+
/**
* Remove a constraint from a table.
*
@@@ -727,23 -671,10 +702,22 @@@
* @return a map from constraint class name to assigned number
* @throws AccumuloException
* thrown if there are errors in the configuration of existing constraints
- * @throws TableNotFoundException
* @since 1.5.0
*/
- public Map<String,Integer> listConstraints(String tableName) throws AccumuloException, TableNotFoundException;
-
+ Map<String,Integer> listConstraints(String tableName) throws AccumuloException, TableNotFoundException;
+
+ /**
+ * Gets the number of bytes being used in the files for a set of tables
+ *
+ * @param tables
+ * a set of tables
+ * @return a list of disk usage objects containing linked table names and sizes
+ * @throws AccumuloException
+ * @throws AccumuloSecurityException
+ * @since 1.6.0
+ */
+ List<DiskUsage> getDiskUsage(Set<String> tables) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
+
/**
* Test to see if the instance can load the given class as the given type. This check uses the table classpath if it is set.
*
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
index 9d033e2,442f1be..3d69cc1
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
@@@ -1316,14 -1129,12 +1314,13 @@@ public class TableOperationsImpl extend
* when there is a general accumulo error
* @throws AccumuloSecurityException
* when the user does not have the proper permissions
- * @throws TableNotFoundException
*/
@Override
- public void offline(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ public void offline(String tableName, boolean wait) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
ArgumentChecker.notNull(tableName);
- List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(Constants.UTF8)));
+ String tableId = Tables.getTableId(instance, tableName);
+ List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes(Constants.UTF8)));
Map<String,String> opts = new HashMap<String,String>();
try {
@@@ -1350,13 -1153,11 +1347,12 @@@
* when there is a general accumulo error
* @throws AccumuloSecurityException
* when the user does not have the proper permissions
- * @throws TableNotFoundException
*/
@Override
- public void online(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ public void online(String tableName, boolean wait) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
ArgumentChecker.notNull(tableName);
- List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(Constants.UTF8)));
+ String tableId = Tables.getTableId(instance, tableName);
+ List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes(Constants.UTF8)));
Map<String,String> opts = new HashMap<String,String>();
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index ef7bcab,0000000..dad62ca
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@@ -1,649 -1,0 +1,647 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapred;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.impl.OfflineScanner;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
+import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * An abstract input format to provide shared methods common to all other input format classes. At the very least, any classes inheriting from this class will
+ * need to define their own {@link RecordReader}.
+ */
+public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
+ protected static final Class<?> CLASS = AccumuloInputFormat.class;
+ protected static final Logger log = Logger.getLogger(CLASS);
+
+ /**
+ * Sets the connector information needed to communicate with Accumulo in this job.
+ *
+ * <p>
+ * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
+ * conversion to a string, and is not intended to be secure.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param principal
+ * a valid Accumulo user name (user must have Table.CREATE permission)
+ * @param token
+ * the user's password
- * @throws org.apache.accumulo.core.client.AccumuloSecurityException
+ * @since 1.5.0
+ */
+ public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
+ InputConfigurator.setConnectorInfo(CLASS, job, principal, token);
+ }
+
+ /**
+ * Sets the connector information needed to communicate with Accumulo in this job.
+ *
+ * <p>
+ * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param principal
+ * a valid Accumulo user name (user must have Table.CREATE permission)
+ * @param tokenFile
+ * the path to the token file
- * @throws AccumuloSecurityException
+ * @since 1.6.0
+ */
+ public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
+ InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
+ }
+
+ /**
+ * Determines if the connector has been configured.
+ *
+ * @param job
+ * the Hadoop context for the configured job
+ * @return true if the connector has been configured, false otherwise
+ * @since 1.5.0
+ * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
+ */
+ protected static Boolean isConnectorInfoSet(JobConf job) {
+ return InputConfigurator.isConnectorInfoSet(CLASS, job);
+ }
+
+ /**
+ * Gets the user name from the configuration.
+ *
+ * @param job
+ * the Hadoop context for the configured job
+ * @return the user name
+ * @since 1.5.0
+ * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
+ */
+ protected static String getPrincipal(JobConf job) {
+ return InputConfigurator.getPrincipal(CLASS, job);
+ }
+
+ /**
+ * Gets the serialized token class from either the configuration or the token file.
+ *
+ * @since 1.5.0
+ * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobConf)} instead.
+ */
+ @Deprecated
+ protected static String getTokenClass(JobConf job) {
+ return getAuthenticationToken(job).getClass().getName();
+ }
+
+ /**
+ * Gets the serialized token from either the configuration or the token file.
+ *
+ * @since 1.5.0
+ * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobConf)} instead.
+ */
+ @Deprecated
+ protected static byte[] getToken(JobConf job) {
+ return AuthenticationToken.AuthenticationTokenSerializer.serialize(getAuthenticationToken(job));
+ }
+
+ /**
+ * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured.
+ *
+ * @param job
+ * the Hadoop context for the configured job
+ * @return the principal's authentication token
+ * @since 1.6.0
+ * @see #setConnectorInfo(JobConf, String, AuthenticationToken)
+ * @see #setConnectorInfo(JobConf, String, String)
+ */
+ protected static AuthenticationToken getAuthenticationToken(JobConf job) {
+ return InputConfigurator.getAuthenticationToken(CLASS, job);
+ }
+
+ /**
+ * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param instanceName
+ * the Accumulo instance name
+ * @param zooKeepers
+ * a comma-separated list of zookeeper servers
+ * @since 1.5.0
+ * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead.
+ */
+ @Deprecated
+ public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
+ InputConfigurator.setZooKeeperInstance(CLASS, job, instanceName, zooKeepers);
+ }
+
+ /**
+ * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param clientConfig
+ * client configuration containing connection options
+ * @since 1.6.0
+ */
+ public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) {
+ InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig);
+ }
+
+ /**
+ * Configures a {@link org.apache.accumulo.core.client.mock.MockInstance} for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param instanceName
+ * the Accumulo instance name
+ * @since 1.5.0
+ */
+ public static void setMockInstance(JobConf job, String instanceName) {
+ InputConfigurator.setMockInstance(CLASS, job, instanceName);
+ }
+
+ /**
+ * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the configuration.
+ *
+ * @param job
+ * the Hadoop context for the configured job
+ * @return an Accumulo instance
+ * @since 1.5.0
+ * @see #setZooKeeperInstance(JobConf, String, String)
+ * @see #setMockInstance(JobConf, String)
+ */
+ protected static Instance getInstance(JobConf job) {
+ return InputConfigurator.getInstance(CLASS, job);
+ }
+
+ /**
+ * Sets the log level for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param level
+ * the logging level
+ * @since 1.5.0
+ */
+ public static void setLogLevel(JobConf job, Level level) {
+ InputConfigurator.setLogLevel(CLASS, job, level);
+ }
+
+ /**
+ * Gets the log level from this configuration.
+ *
+ * @param job
+ * the Hadoop context for the configured job
+ * @return the log level
+ * @since 1.5.0
+ * @see #setLogLevel(JobConf, Level)
+ */
+ protected static Level getLogLevel(JobConf job) {
+ return InputConfigurator.getLogLevel(CLASS, job);
+ }
+
+ /**
+ * Sets the {@link org.apache.accumulo.core.security.Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param auths
+ * the user's authorizations
+ * @since 1.5.0
+ */
+ public static void setScanAuthorizations(JobConf job, Authorizations auths) {
+ InputConfigurator.setScanAuthorizations(CLASS, job, auths);
+ }
+
+ /**
+ * Gets the authorizations to set for the scans from the configuration.
+ *
+ * @param job
+ * the Hadoop context for the configured job
+ * @return the Accumulo scan authorizations
+ * @since 1.5.0
+ * @see #setScanAuthorizations(JobConf, Authorizations)
+ */
+ protected static Authorizations getScanAuthorizations(JobConf job) {
+ return InputConfigurator.getScanAuthorizations(CLASS, job);
+ }
+
+ /**
+ * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration.
+ *
+ * @param job
+ * the Hadoop context for the configured job
+ * @return an Accumulo tablet locator
+ * @throws org.apache.accumulo.core.client.TableNotFoundException
+ * if the table name set on the configuration doesn't exist
+ * @since 1.6.0
+ */
+ protected static TabletLocator getTabletLocator(JobConf job, String tableId) throws TableNotFoundException {
+ return InputConfigurator.getTabletLocator(CLASS, job, tableId);
+ }
+
+ // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
+ /**
+ * Check whether a configuration is fully configured to be used with an Accumulo {@link InputFormat}.
+ *
+ * @param job
+ * the Hadoop context for the configured job
+ * @throws java.io.IOException
+ * if the context is improperly configured
+ * @since 1.5.0
+ */
+ protected static void validateOptions(JobConf job) throws IOException {
+ InputConfigurator.validateOptions(CLASS, job);
+ }
+
+ /**
+ * Fetches all {@link InputTableConfig}s that have been set on the given Hadoop job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @return the {@link InputTableConfig} objects set on the job
+ * @since 1.6.0
+ */
+ public static Map<String,InputTableConfig> getInputTableConfigs(JobConf job) {
+ return InputConfigurator.getInputTableConfigs(CLASS, job);
+ }
+
+ /**
+ * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table.
+ *
+ * <p>
+ * null is returned in the event that the table doesn't exist.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param tableName
+ * the table name for which to grab the config object
+ * @return the {@link InputTableConfig} for the given table
+ * @since 1.6.0
+ */
+ public static InputTableConfig getInputTableConfig(JobConf job, String tableName) {
+ return InputConfigurator.getInputTableConfig(CLASS, job, tableName);
+ }
+
+ /**
+ * An abstract base class to be used to create {@link org.apache.hadoop.mapred.RecordReader} instances that convert from Accumulo
+ * {@link org.apache.accumulo.core.data.Key}/{@link org.apache.accumulo.core.data.Value} pairs to the user's K/V types.
+ *
+ * Subclasses must implement {@link #next(Object, Object)} to update key and value, and also to update the following variables:
+ * <ul>
+ * <li>Key {@link #currentKey} (used for progress reporting)</li>
+ * <li>int {@link #numKeysRead} (used for progress reporting)</li>
+ * </ul>
+ */
+ protected abstract static class AbstractRecordReader<K,V> implements RecordReader<K,V> {
+ protected long numKeysRead;
+ protected Iterator<Map.Entry<Key,Value>> scannerIterator;
+ protected RangeInputSplit split;
+
+ /**
+ * Configures the iterators on a scanner for the given table name.
+ *
+ * @param job
+ * the Hadoop job configuration
+ * @param scanner
+ * the scanner for which to configure the iterators
+ * @param tableName
+ * the table name for which the scanner is configured
+ * @since 1.6.0
+ */
+ protected abstract void setupIterators(JobConf job, Scanner scanner, String tableName, RangeInputSplit split);
+
+ /**
+ * Initialize a scanner over the given input split using this task attempt configuration.
+ */
+ public void initialize(InputSplit inSplit, JobConf job) throws IOException {
+ Scanner scanner;
+ split = (RangeInputSplit) inSplit;
+ log.debug("Initializing input split: " + split.getRange());
+
+ Instance instance = split.getInstance();
+ if (null == instance) {
+ instance = getInstance(job);
+ }
+
+ String principal = split.getPrincipal();
+ if (null == principal) {
+ principal = getPrincipal(job);
+ }
+
+ AuthenticationToken token = split.getToken();
+ if (null == token) {
+ token = getAuthenticationToken(job);
+ }
+
+ Authorizations authorizations = split.getAuths();
+ if (null == authorizations) {
+ authorizations = getScanAuthorizations(job);
+ }
+
+ String table = split.getTableName();
+
+ // in case the table name changed, we can still use the previous name for terms of configuration,
+ // but the scanner will use the table id resolved at job setup time
+ InputTableConfig tableConfig = getInputTableConfig(job, split.getTableName());
+
+ Boolean isOffline = split.isOffline();
+ if (null == isOffline) {
+ isOffline = tableConfig.isOfflineScan();
+ }
+
+ Boolean isIsolated = split.isIsolatedScan();
+ if (null == isIsolated) {
+ isIsolated = tableConfig.shouldUseIsolatedScanners();
+ }
+
+ Boolean usesLocalIterators = split.usesLocalIterators();
+ if (null == usesLocalIterators) {
+ usesLocalIterators = tableConfig.shouldUseLocalIterators();
+ }
+
+ List<IteratorSetting> iterators = split.getIterators();
+ if (null == iterators) {
+ iterators = tableConfig.getIterators();
+ }
+
+ Collection<Pair<Text,Text>> columns = split.getFetchedColumns();
+ if (null == columns) {
+ columns = tableConfig.getFetchedColumns();
+ }
+
+ try {
+ log.debug("Creating connector with user: " + principal);
+ log.debug("Creating scanner for table: " + table);
+ log.debug("Authorizations are: " + authorizations);
+ if (isOffline) {
+ scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations);
+ } else if (instance instanceof MockInstance) {
+ scanner = instance.getConnector(principal, token).createScanner(split.getTableName(), authorizations);
+ } else {
+ scanner = new ScannerImpl(instance, new Credentials(principal, token), split.getTableId(), authorizations);
+ }
+ if (isIsolated) {
+ log.info("Creating isolated scanner");
+ scanner = new IsolatedScanner(scanner);
+ }
+ if (usesLocalIterators) {
+ log.info("Using local iterators");
+ scanner = new ClientSideIteratorScanner(scanner);
+ }
+ setupIterators(job, scanner, split.getTableName(), split);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+
+ // setup a scanner within the bounds of this split
+ for (Pair<Text,Text> c : columns) {
+ if (c.getSecond() != null) {
+ log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
+ scanner.fetchColumn(c.getFirst(), c.getSecond());
+ } else {
+ log.debug("Fetching column family " + c.getFirst());
+ scanner.fetchColumnFamily(c.getFirst());
+ }
+ }
+
+ scanner.setRange(split.getRange());
+
+ numKeysRead = 0;
+
+ // do this last after setting all scanner options
+ scannerIterator = scanner.iterator();
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public long getPos() throws IOException {
+ return numKeysRead;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (numKeysRead > 0 && currentKey == null)
+ return 1.0f;
+ return split.getProgress(currentKey);
+ }
+
+ protected Key currentKey = null;
+
+ }
+
+ Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableId, List<Range> ranges) throws TableNotFoundException, AccumuloException,
+ AccumuloSecurityException {
+
+ Instance instance = getInstance(job);
+ Connector conn = instance.getConnector(getPrincipal(job), getAuthenticationToken(job));
+
+ return InputConfigurator.binOffline(tableId, ranges, instance, conn);
+ }
+
+ /**
+ * Read the metadata table to get tablets and match up ranges to them.
+ */
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ Level logLevel = getLogLevel(job);
+ log.setLevel(logLevel);
+ validateOptions(job);
+
+ Random random = new Random();
+ LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
+ Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(job);
+ for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
+ String tableName = tableConfigEntry.getKey();
+ InputTableConfig tableConfig = tableConfigEntry.getValue();
+
+ Instance instance = getInstance(job);
+ boolean mockInstance;
+ String tableId;
+ // resolve table name to id once, and use id from this point forward
+ if (instance instanceof MockInstance) {
+ tableId = "";
+ mockInstance = true;
+ } else {
+ try {
+ tableId = Tables.getTableId(instance, tableName);
+ } catch (TableNotFoundException e) {
+ throw new IOException(e);
+ }
+ mockInstance = false;
+ }
+
+ Authorizations auths = getScanAuthorizations(job);
+ String principal = getPrincipal(job);
+ AuthenticationToken token = getAuthenticationToken(job);
+
+ boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
+ List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
+ if (ranges.isEmpty()) {
+ ranges = new ArrayList<Range>(1);
+ ranges.add(new Range());
+ }
+
+ // get the metadata information for these ranges
+ Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+ TabletLocator tl;
+ try {
+ if (tableConfig.isOfflineScan()) {
+ binnedRanges = binOfflineTable(job, tableId, ranges);
+ while (binnedRanges == null) {
+ // Some tablets were still online, try again
+ UtilWaitThread.sleep(100 + random.nextInt(100)); // sleep randomly between 100 and 200 ms
+ binnedRanges = binOfflineTable(job, tableId, ranges);
+ }
+ } else {
+ tl = getTabletLocator(job, tableId);
+ // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
+ tl.invalidateCache();
+ Credentials creds = new Credentials(getPrincipal(job), getAuthenticationToken(job));
+
+ while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
+ if (!(instance instanceof MockInstance)) {
+ if (!Tables.exists(instance, tableId))
+ throw new TableDeletedException(tableId);
+ if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+ throw new TableOfflineException(instance, tableId);
+ }
+ binnedRanges.clear();
+ log.warn("Unable to locate bins for specified ranges. Retrying.");
+ UtilWaitThread.sleep(100 + random.nextInt(100)); // sleep randomly between 100 and 200 ms
+ tl.invalidateCache();
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ HashMap<Range,ArrayList<String>> splitsToAdd = null;
+
+ if (!autoAdjust)
+ splitsToAdd = new HashMap<Range,ArrayList<String>>();
+
+ HashMap<String,String> hostNameCache = new HashMap<String,String>();
+ for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+ String ip = tserverBin.getKey().split(":", 2)[0];
+ String location = hostNameCache.get(ip);
+ if (location == null) {
+ InetAddress inetAddress = InetAddress.getByName(ip);
+ location = inetAddress.getCanonicalHostName();
+ hostNameCache.put(ip, location);
+ }
+ for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+ Range ke = extentRanges.getKey().toDataRange();
+ for (Range r : extentRanges.getValue()) {
+ if (autoAdjust) {
+ // divide ranges into smaller ranges, based on the tablets
+ RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location});
+
+ split.setOffline(tableConfig.isOfflineScan());
+ split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
+ split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
+ split.setMockInstance(mockInstance);
+ split.setFetchedColumns(tableConfig.getFetchedColumns());
+ split.setPrincipal(principal);
+ split.setToken(token);
+ split.setInstanceName(instance.getInstanceName());
+ split.setZooKeepers(instance.getZooKeepers());
+ split.setAuths(auths);
+ split.setIterators(tableConfig.getIterators());
+ split.setLogLevel(logLevel);
+
+ splits.add(split);
+ } else {
+ // don't divide ranges
+ ArrayList<String> locations = splitsToAdd.get(r);
+ if (locations == null)
+ locations = new ArrayList<String>(1);
+ locations.add(location);
+ splitsToAdd.put(r, locations);
+ }
+ }
+ }
+ }
+
+ if (!autoAdjust)
+ for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
+ RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0]));
+
+ split.setOffline(tableConfig.isOfflineScan());
+ split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
+ split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
+ split.setMockInstance(mockInstance);
+ split.setFetchedColumns(tableConfig.getFetchedColumns());
+ split.setPrincipal(principal);
+ split.setToken(token);
+ split.setInstanceName(instance.getInstanceName());
+ split.setZooKeepers(instance.getZooKeepers());
+ split.setAuths(auths);
+ split.setIterators(tableConfig.getIterators());
+ split.setLogLevel(logLevel);
+
+ splits.add(split);
+ }
+ }
+
+ return splits.toArray(new InputSplit[splits.size()]);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
index 02512a4,d7be37c..1ec4c41
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
@@@ -91,26 -88,7 +90,25 @@@ public class AccumuloOutputFormat imple
public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
OutputConfigurator.setConnectorInfo(CLASS, job, principal, token);
}
-
+
+ /**
+ * Sets the connector information needed to communicate with Accumulo in this job.
+ *
+ * <p>
+ * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param principal
+ * a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(JobConf, boolean)} is set to true)
+ * @param tokenFile
+ * the path to the password file
- * @throws AccumuloSecurityException
+ * @since 1.6.0
+ */
+ public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
+ OutputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
+ }
+
/**
* Determines if the connector has been configured.
*