You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/12/05 00:58:44 UTC
[03/50] [abbrv] git commit: ACCUMULO-802 updated more shell commands
to include the tableNamespaces option
ACCUMULO-802 updated more shell commands to include the tableNamespaces option
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5acd6a48
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5acd6a48
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5acd6a48
Branch: refs/heads/master
Commit: 5acd6a480a93b1c13639db8e8ba64a81c32651bb
Parents: 431f4e2
Author: Sean Hickey <ta...@gmail.com>
Authored: Tue Aug 6 08:54:05 2013 -0400
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Wed Dec 4 18:46:09 2013 -0500
----------------------------------------------------------------------
.../client/admin/TableNamespaceOperations.java | 9 +
.../admin/TableNamespaceOperationsImpl.java | 46 +
.../core/client/admin/TableOperationsImpl.java | 411 ++---
.../core/client/impl/thrift/ClientService.java | 1431 ++++++++++++++++++
.../mock/MockTableNamespaceOperations.java | 17 +-
.../util/shell/commands/ConstraintCommand.java | 18 +-
.../util/shell/commands/DeleteIterCommand.java | 37 +-
.../util/shell/commands/ListIterCommand.java | 32 +-
.../core/util/shell/commands/OptUtil.java | 25 +
.../util/shell/commands/SetIterCommand.java | 98 +-
.../core/util/shell/commands/TablesCommand.java | 8 +-
core/src/main/thrift/client.thrift | 1 +
.../server/client/ClientServiceHandler.java | 46 +
.../server/conf/TableNamespaceConfWatcher.java | 107 ++
.../server/conf/TableNamespaceConfWatcher.java | 107 --
.../org/apache/accumulo/test/ShellServerIT.java | 4 +
16 files changed, 2048 insertions(+), 349 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5acd6a48/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperations.java
index f572104..314d007 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperations.java
@@ -414,4 +414,13 @@ public interface TableNamespaceOperations {
* thrown if the table namespace no longer exists
*/
public Map<String,Integer> listConstraints(String tableNamespace) throws AccumuloException, TableNamespaceNotFoundException;
+
+
+ /**
+ * Test to see if the instance can load the given class as the given type. This check uses the table classpath property if it is set.
+ *
+ * @return true if the instance can load the given class as the given type, false otherwise
+ */
+ boolean testClassLoad(String namespace, String className, String asTypeName) throws TableNamespaceNotFoundException, AccumuloException,
+ AccumuloSecurityException;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5acd6a48/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperationsImpl.java
index 0d54b51..90d59af 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableNamespaceOperationsImpl.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.client.admin;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNamespaceExistsException;
import org.apache.accumulo.core.client.TableNamespaceNotEmptyException;
@@ -49,7 +51,10 @@ import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.ClientService;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.constraints.Constraint;
import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.master.thrift.TableOperation;
import org.apache.accumulo.core.security.Credentials;
@@ -599,4 +604,45 @@ public class TableNamespaceOperationsImpl extends TableNamespaceOperationsHelper
return new TableOperationsImpl(instance, credentials);
}
+ @Override
+ public void attachIterator(String namespace, IteratorSetting setting, EnumSet<IteratorScope> scopes) throws AccumuloSecurityException, AccumuloException,
+ TableNamespaceNotFoundException {
+ testClassLoad(namespace, setting.getIteratorClass(), SortedKeyValueIterator.class.getName());
+ super.attachIterator(namespace, setting, scopes);
+ }
+
+ @Override
+ public int addConstraint(String namespace, String constraintClassName) throws AccumuloException, AccumuloSecurityException, TableNamespaceNotFoundException {
+ testClassLoad(namespace, constraintClassName, Constraint.class.getName());
+ return super.addConstraint(namespace, constraintClassName);
+ }
+
+ @Override
+ public boolean testClassLoad(final String namespace, final String className, final String asTypeName) throws TableNamespaceNotFoundException,
+ AccumuloException, AccumuloSecurityException {
+ ArgumentChecker.notNull(namespace, className, asTypeName);
+
+ try {
+ return ServerClient.executeRaw(instance, new ClientExecReturn<Boolean,ClientService.Client>() {
+ @Override
+ public Boolean execute(ClientService.Client client) throws Exception {
+ return client.checkTableNamespaceClass(Tracer.traceInfo(), credentials.toThrift(instance), namespace, className, asTypeName);
+ }
+ });
+ } catch (ThriftTableOperationException e) {
+ switch (e.getType()) {
+ case NOTFOUND:
+ throw new TableNamespaceNotFoundException(e);
+ case OTHER:
+ default:
+ throw new AccumuloException(e.description, e);
+ }
+ } catch (ThriftSecurityException e) {
+ throw new AccumuloSecurityException(e.user, e.code, e);
+ } catch (AccumuloException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AccumuloException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5acd6a48/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
index 18d2e40..be1a783 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
@@ -127,11 +127,11 @@ import org.apache.thrift.transport.TTransportException;
public class TableOperationsImpl extends TableOperationsHelper {
private Instance instance;
private Credentials credentials;
-
+
public static final String CLONE_EXCLUDE_PREFIX = "!";
private static final Logger log = Logger.getLogger(TableOperations.class);
-
+
/**
* @param instance
* the connection information for this instance
@@ -143,7 +143,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
this.instance = instance;
this.credentials = credentials;
}
-
+
/**
* Retrieve a list of tables in Accumulo.
*
@@ -156,7 +156,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
opTimer.stop("Fetched " + tableNames.size() + " table names in %DURATION%");
return tableNames;
}
-
+
/**
* A method to check if a table exists in Accumulo.
*
@@ -169,13 +169,13 @@ public class TableOperationsImpl extends TableOperationsHelper {
ArgumentChecker.notNull(tableName);
if (tableName.equals(MetadataTable.NAME) || tableName.equals(RootTable.NAME))
return true;
-
+
OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Checking if table " + tableName + " exists...");
boolean exists = Tables.getNameToIdMap(instance).containsKey(tableName);
opTimer.stop("Checked existance of " + exists + " in %DURATION%");
return exists;
}
-
+
/**
* Create a table with no special configuration
*
@@ -192,7 +192,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
public void create(String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException {
create(tableName, true, TimeType.MILLIS);
}
-
+
/**
* @param tableName
* the name of the table
@@ -203,7 +203,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
public void create(String tableName, boolean limitVersion) throws AccumuloException, AccumuloSecurityException, TableExistsException {
create(tableName, limitVersion, TimeType.MILLIS);
}
-
+
/**
* @param tableName
* the name of the table
@@ -215,12 +215,12 @@ public class TableOperationsImpl extends TableOperationsHelper {
@Override
public void create(String tableName, boolean limitVersion, TimeType timeType) throws AccumuloException, AccumuloSecurityException, TableExistsException {
ArgumentChecker.notNull(tableName, timeType);
-
+
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(timeType.name().getBytes()));
-
+
// Map<String,String> opts = IteratorUtil.generateInitialTableProperties(limitVersion);
Map<String,String> opts = new HashMap<String,String>();
-
+
String namespace = Tables.extractNamespace(tableName);
if (!namespaceExists(namespace)) {
String info = "Table namespace not found while trying to create table";
@@ -229,7 +229,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
String info = "Can't create tables in the system namespace";
throw new IllegalArgumentException(info);
}
-
+
try {
doTableOperation(TableOperation.CREATE, args, opts);
} catch (TableNotFoundException e1) {
@@ -237,7 +237,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new RuntimeException(e1);
}
}
-
+
private long beginTableOperation() throws ThriftSecurityException, TException {
while (true) {
MasterClientService.Iface client = null;
@@ -252,7 +252,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
}
}
-
+
private void executeTableOperation(long opid, TableOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean autoCleanUp)
throws ThriftSecurityException, TException, ThriftTableOperationException {
while (true) {
@@ -269,7 +269,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
}
}
-
+
private String waitForTableOperation(long opid) throws ThriftSecurityException, TException, ThriftTableOperationException {
while (true) {
MasterClientService.Iface client = null;
@@ -284,7 +284,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
}
}
-
+
private void finishTableOperation(long opid) throws ThriftSecurityException, TException {
while (true) {
MasterClientService.Iface client = null;
@@ -300,16 +300,16 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
}
}
-
+
private String doTableOperation(TableOperation op, List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException, TableExistsException,
TableNotFoundException, AccumuloException {
return doTableOperation(op, args, opts, true);
}
-
+
private String doTableOperation(TableOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean wait) throws AccumuloSecurityException,
TableExistsException, TableNotFoundException, AccumuloException {
Long opid = null;
-
+
try {
opid = beginTableOperation();
executeTableOperation(opid, op, args, opts, !wait);
@@ -348,14 +348,14 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
}
}
-
+
private static class SplitEnv {
private String tableName;
private String tableId;
private ExecutorService executor;
private CountDownLatch latch;
private AtomicReference<Exception> exception;
-
+
SplitEnv(String tableName, String tableId, ExecutorService executor, CountDownLatch latch, AtomicReference<Exception> exception) {
this.tableName = tableName;
this.tableId = tableId;
@@ -364,47 +364,47 @@ public class TableOperationsImpl extends TableOperationsHelper {
this.exception = exception;
}
}
-
+
private class SplitTask implements Runnable {
-
+
private List<Text> splits;
private SplitEnv env;
-
+
SplitTask(SplitEnv env, List<Text> splits) {
this.env = env;
this.splits = splits;
}
-
+
@Override
public void run() {
try {
if (env.exception.get() != null)
return;
-
+
if (splits.size() <= 2) {
addSplits(env.tableName, new TreeSet<Text>(splits), env.tableId);
for (int i = 0; i < splits.size(); i++)
env.latch.countDown();
return;
}
-
+
int mid = splits.size() / 2;
-
+
// split the middle split point to ensure that child task split different tablets and can therefore
// run in parallel
addSplits(env.tableName, new TreeSet<Text>(splits.subList(mid, mid + 1)), env.tableId);
env.latch.countDown();
-
+
env.executor.submit(new SplitTask(env, splits.subList(0, mid)));
env.executor.submit(new SplitTask(env, splits.subList(mid + 1, splits.size())));
-
+
} catch (Exception e) {
env.exception.compareAndSet(null, e);
}
}
-
+
}
-
+
/**
* @param tableName
* the name of the table
@@ -420,19 +420,19 @@ public class TableOperationsImpl extends TableOperationsHelper {
@Override
public void addSplits(String tableName, SortedSet<Text> partitionKeys) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
String tableId = Tables.getTableId(instance, tableName);
-
+
List<Text> splits = new ArrayList<Text>(partitionKeys);
// should be sorted because we copied from a sorted set, but that makes assumptions about
// how the copy was done so resort to be sure.
Collections.sort(splits);
-
+
CountDownLatch latch = new CountDownLatch(splits.size());
AtomicReference<Exception> exception = new AtomicReference<Exception>(null);
-
+
ExecutorService executor = Executors.newFixedThreadPool(16, new NamingThreadFactory("addSplits"));
try {
executor.submit(new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));
-
+
while (!latch.await(100, TimeUnit.MILLISECONDS)) {
if (exception.get() != null) {
executor.shutdownNow();
@@ -455,24 +455,24 @@ public class TableOperationsImpl extends TableOperationsHelper {
executor.shutdown();
}
}
-
+
private void addSplits(String tableName, SortedSet<Text> partitionKeys, String tableId) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException, AccumuloServerException {
TabletLocator tabLocator = TabletLocator.getLocator(instance, new Text(tableId));
-
+
for (Text split : partitionKeys) {
boolean successful = false;
int attempt = 0;
-
+
while (!successful) {
-
+
if (attempt > 0)
UtilWaitThread.sleep(100);
-
+
attempt++;
-
+
TabletLocation tl = tabLocator.locateTablet(credentials, split, false, false);
-
+
if (tl == null) {
if (!Tables.exists(instance, tableId))
throw new TableNotFoundException(tableId, tableName, null);
@@ -480,25 +480,25 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new TableOfflineException(instance, tableId);
continue;
}
-
+
try {
TabletClientService.Client client = ThriftUtil.getTServerClient(tl.tablet_location, ServerConfigurationUtil.getConfiguration(instance));
try {
OpTimer opTimer = null;
if (log.isTraceEnabled())
opTimer = new OpTimer(log, Level.TRACE).start("Splitting tablet " + tl.tablet_extent + " on " + tl.tablet_location + " at " + split);
-
+
client.splitTablet(Tracer.traceInfo(), credentials.toThrift(instance), tl.tablet_extent.toThrift(), TextUtil.getByteBuffer(split));
-
+
// just split it, might as well invalidate it in the cache
tabLocator.invalidateCache(tl.tablet_extent);
-
+
if (opTimer != null)
opTimer.stop("Split tablet in %DURATION%");
} finally {
ThriftUtil.returnClient(client);
}
-
+
} catch (TApplicationException tae) {
throw new AccumuloServerException(tl.tablet_location, tae);
} catch (TTransportException e) {
@@ -516,15 +516,15 @@ public class TableOperationsImpl extends TableOperationsHelper {
tabLocator.invalidateCache(tl.tablet_location);
continue;
}
-
+
successful = true;
}
}
}
-
+
@Override
public void merge(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-
+
ArgumentChecker.notNull(tableName);
ByteBuffer EMPTY = ByteBuffer.allocate(0);
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY
@@ -537,10 +537,10 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new RuntimeException(e);
}
}
-
+
@Override
public void deleteRows(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-
+
ArgumentChecker.notNull(tableName);
ByteBuffer EMPTY = ByteBuffer.allocate(0);
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY
@@ -553,7 +553,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new RuntimeException(e);
}
}
-
+
/**
* @param tableName
* the name of the table
@@ -561,13 +561,13 @@ public class TableOperationsImpl extends TableOperationsHelper {
*/
@Override
public Collection<Text> listSplits(String tableName) throws TableNotFoundException, AccumuloSecurityException {
-
+
ArgumentChecker.notNull(tableName);
-
+
String tableId = Tables.getTableId(instance, tableName);
-
+
TreeMap<KeyExtent,String> tabletLocations = new TreeMap<KeyExtent,String>();
-
+
while (true) {
try {
tabletLocations.clear();
@@ -580,25 +580,25 @@ public class TableOperationsImpl extends TableOperationsHelper {
if (!Tables.exists(instance, tableId)) {
throw new TableNotFoundException(tableId, tableName, null);
}
-
+
if (e instanceof RuntimeException && e.getCause() instanceof AccumuloSecurityException) {
throw (AccumuloSecurityException) e.getCause();
}
-
+
log.info(e.getMessage() + " ... retrying ...");
UtilWaitThread.sleep(3000);
}
}
-
+
ArrayList<Text> endRows = new ArrayList<Text>(tabletLocations.size());
-
+
for (KeyExtent ke : tabletLocations.keySet())
if (ke.getEndRow() != null)
endRows.add(ke.getEndRow());
-
+
return endRows;
}
-
+
@Deprecated
@Override
public Collection<Text> getSplits(String tableName) throws TableNotFoundException {
@@ -608,7 +608,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new RuntimeException(e);
}
}
-
+
/**
* @param tableName
* the name of the table
@@ -620,15 +620,15 @@ public class TableOperationsImpl extends TableOperationsHelper {
@Override
public Collection<Text> listSplits(String tableName, int maxSplits) throws TableNotFoundException, AccumuloSecurityException {
Collection<Text> endRows = listSplits(tableName);
-
+
if (endRows.size() <= maxSplits)
return endRows;
-
+
double r = (maxSplits + 1) / (double) (endRows.size());
double pos = 0;
-
+
ArrayList<Text> subset = new ArrayList<Text>(maxSplits);
-
+
int j = 0;
for (int i = 0; i < endRows.size() && j < maxSplits; i++) {
pos += r;
@@ -638,10 +638,10 @@ public class TableOperationsImpl extends TableOperationsHelper {
pos -= 1;
}
}
-
+
return subset;
}
-
+
@Deprecated
@Override
public Collection<Text> getSplits(String tableName, int maxSplits) throws TableNotFoundException {
@@ -651,7 +651,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new RuntimeException(e);
}
}
-
+
/**
* Delete a table
*
@@ -667,51 +667,51 @@ public class TableOperationsImpl extends TableOperationsHelper {
@Override
public void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
ArgumentChecker.notNull(tableName);
-
+
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()));
Map<String,String> opts = new HashMap<String,String>();
-
+
try {
doTableOperation(TableOperation.DELETE, args, opts);
} catch (TableExistsException e) {
// should not happen
throw new RuntimeException(e);
}
-
+
}
-
+
@Override
public void clone(String srcTableName, String newTableName, boolean flush, Map<String,String> propertiesToSet, Set<String> propertiesToExclude)
throws AccumuloSecurityException, TableNotFoundException, AccumuloException, TableExistsException {
-
+
ArgumentChecker.notNull(srcTableName, newTableName);
-
+
String namespace = Tables.extractNamespace(newTableName);
if (!namespaceExists(namespace)) {
String info = "Table namespace not found while cloning table";
throw new IllegalArgumentException(new TableNamespaceNotFoundException(null, namespace, info));
}
-
+
String srcTableId = Tables.getTableId(instance, srcTableName);
-
+
if (flush)
_flush(srcTableId, null, null, true);
-
+
if (propertiesToExclude == null)
propertiesToExclude = Collections.emptySet();
-
+
if (propertiesToSet == null)
propertiesToSet = Collections.emptyMap();
-
+
// TODO ACCUMULO-1565 needs to be fixed before the commented-out code below will work.
- HashSet<String> excludeProps = new HashSet<String>();//getUniqueNamespaceProperties(namespace, srcTableName);
+ HashSet<String> excludeProps = new HashSet<String>();// getUniqueNamespaceProperties(namespace, srcTableName);
for (String p : propertiesToExclude) {
excludeProps.add(p);
}
-
+
if (!Collections.disjoint(excludeProps, propertiesToSet.keySet()))
throw new IllegalArgumentException("propertiesToSet and propertiesToExclude not disjoint");
-
+
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(srcTableId.getBytes()), ByteBuffer.wrap(newTableName.getBytes()));
Map<String,String> opts = new HashMap<String,String>();
for (Entry<String,String> entry : propertiesToSet.entrySet()) {
@@ -719,14 +719,14 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new IllegalArgumentException("Property can not start with " + CLONE_EXCLUDE_PREFIX);
opts.put(entry.getKey(), entry.getValue());
}
-
+
for (String prop : propertiesToExclude) {
opts.put(CLONE_EXCLUDE_PREFIX + prop, "");
}
-
+
doTableOperation(TableOperation.CLONE, args, opts);
}
-
+
// get the properties that are only in the table namespace so that we can exclude them when copying table properties
// TODO ACCUMULO-1565 needs fixed first
private HashSet<String> getUniqueNamespaceProperties(String namespace, String table) throws TableNotFoundException, AccumuloException {
@@ -749,7 +749,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
return props;
}
-
+
/**
* Rename a table
*
@@ -769,18 +769,18 @@ public class TableOperationsImpl extends TableOperationsHelper {
@Override
public void rename(String oldTableName, String newTableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException,
TableExistsException {
-
+
String namespace = Tables.extractNamespace(newTableName);
if (!namespaceExists(namespace)) {
String info = "Table namespace not found while renaming table";
throw new IllegalArgumentException(new TableNamespaceNotFoundException(null, namespace, info));
}
-
+
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes()), ByteBuffer.wrap(newTableName.getBytes()));
Map<String,String> opts = new HashMap<String,String>();
doTableOperation(TableOperation.RENAME, args, opts);
}
-
+
/**
* @deprecated since 1.4 {@link #flush(String, Text, Text, boolean)}
*/
@@ -793,7 +793,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new AccumuloException(e.getMessage(), e);
}
}
-
+
/**
* Flush a table
*
@@ -808,31 +808,31 @@ public class TableOperationsImpl extends TableOperationsHelper {
@Override
public void flush(String tableName, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
ArgumentChecker.notNull(tableName);
-
+
String tableId = Tables.getTableId(instance, tableName);
_flush(tableId, start, end, wait);
}
-
+
@Override
public void compact(String tableName, Text start, Text end, boolean flush, boolean wait) throws AccumuloSecurityException, TableNotFoundException,
AccumuloException {
compact(tableName, start, end, new ArrayList<IteratorSetting>(), flush, wait);
}
-
+
@Override
public void compact(String tableName, Text start, Text end, List<IteratorSetting> iterators, boolean flush, boolean wait) throws AccumuloSecurityException,
TableNotFoundException, AccumuloException {
ArgumentChecker.notNull(tableName);
ByteBuffer EMPTY = ByteBuffer.allocate(0);
-
+
String tableId = Tables.getTableId(instance, tableName);
-
+
if (flush)
_flush(tableId, start, end, true);
-
+
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY
: TextUtil.getByteBuffer(end), ByteBuffer.wrap(IteratorUtil.encodeIteratorSettings(iterators)));
-
+
Map<String,String> opts = new HashMap<String,String>();
try {
doTableOperation(TableOperation.COMPACT, args, opts, wait);
@@ -841,13 +841,13 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new RuntimeException(e);
}
}
-
+
@Override
public void cancelCompaction(String tableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
String tableId = Tables.getTableId(instance, tableName);
-
+
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes()));
-
+
Map<String,String> opts = new HashMap<String,String>();
try {
doTableOperation(TableOperation.COMPACT_CANCEL, args, opts, true);
@@ -855,17 +855,17 @@ public class TableOperationsImpl extends TableOperationsHelper {
// should not happen
throw new RuntimeException(e);
}
-
+
}
-
+
private void _flush(String tableId, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-
+
try {
long flushID;
-
+
// used to pass the table name. but the tableid associated with a table name could change between calls.
// so pass the tableid to both calls
-
+
while (true) {
MasterClientService.Iface client = null;
try {
@@ -879,7 +879,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
MasterClient.close(client);
}
}
-
+
while (true) {
MasterClientService.Iface client = null;
try {
@@ -912,7 +912,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new AccumuloException(e);
}
}
-
+
/**
* Sets a property on a table
*
@@ -937,7 +937,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
});
}
-
+
/**
* Removes a property from a table
*
@@ -960,7 +960,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
});
}
-
+
/**
* Gets properties of a table
*
@@ -993,9 +993,9 @@ public class TableOperationsImpl extends TableOperationsHelper {
} catch (Exception e) {
throw new AccumuloException(e);
}
-
+
}
-
+
/**
* Sets a tables locality groups. A tables locality groups can be changed at any time.
*
@@ -1015,22 +1015,22 @@ public class TableOperationsImpl extends TableOperationsHelper {
// ensure locality groups do not overlap
HashSet<Text> all = new HashSet<Text>();
for (Entry<String,Set<Text>> entry : groups.entrySet()) {
-
+
if (!Collections.disjoint(all, entry.getValue())) {
throw new IllegalArgumentException("Group " + entry.getKey() + " overlaps with another group");
}
-
+
all.addAll(entry.getValue());
}
-
+
for (Entry<String,Set<Text>> entry : groups.entrySet()) {
Set<Text> colFams = entry.getValue();
String value = LocalityGroupUtil.encodeColumnFamilies(colFams);
setProperty(tableName, Property.TABLE_LOCALITY_GROUP_PREFIX + entry.getKey(), value);
}
-
+
setProperty(tableName, Property.TABLE_LOCALITY_GROUPS.getKey(), StringUtil.join(groups.keySet(), ","));
-
+
// remove anything extraneous
String prefix = Property.TABLE_LOCALITY_GROUP_PREFIX.getKey();
for (Entry<String,String> entry : getProperties(tableName)) {
@@ -1040,14 +1040,14 @@ public class TableOperationsImpl extends TableOperationsHelper {
// one:
String[] parts = property.split("\\.");
String group = parts[parts.length - 1];
-
+
if (!groups.containsKey(group)) {
removeProperty(tableName, property);
}
}
}
}
-
+
/**
*
* Gets the locality groups currently set for a table.
@@ -1064,22 +1064,22 @@ public class TableOperationsImpl extends TableOperationsHelper {
public Map<String,Set<Text>> getLocalityGroups(String tableName) throws AccumuloException, TableNotFoundException {
AccumuloConfiguration conf = new ConfigurationCopy(this.getProperties(tableName));
Map<String,Set<ByteSequence>> groups = LocalityGroupUtil.getLocalityGroups(conf);
-
+
Map<String,Set<Text>> groups2 = new HashMap<String,Set<Text>>();
for (Entry<String,Set<ByteSequence>> entry : groups.entrySet()) {
-
+
HashSet<Text> colFams = new HashSet<Text>();
-
+
for (ByteSequence bs : entry.getValue()) {
colFams.add(new Text(bs.toArray()));
}
-
+
groups2.put(entry.getKey(), colFams);
}
-
+
return groups2;
}
-
+
/**
* @param tableName
* the name of the table
@@ -1103,7 +1103,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new IllegalArgumentException("maximum splits must be >= 1");
if (maxSplits == 1)
return Collections.singleton(range);
-
+
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
String tableId = Tables.getTableId(instance, tableName);
TabletLocator tl = TabletLocator.getLocator(instance, new Text(tableId));
@@ -1114,24 +1114,24 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new TableDeletedException(tableId);
if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
throw new TableOfflineException(instance, tableId);
-
+
log.warn("Unable to locate bins for specified range. Retrying.");
// sleep randomly between 100 and 200ms
UtilWaitThread.sleep(100 + (int) (Math.random() * 100));
binnedRanges.clear();
tl.invalidateCache();
}
-
+
// group key extents to get <= maxSplits
LinkedList<KeyExtent> unmergedExtents = new LinkedList<KeyExtent>();
List<KeyExtent> mergedExtents = new ArrayList<KeyExtent>();
-
+
for (Map<KeyExtent,List<Range>> map : binnedRanges.values())
unmergedExtents.addAll(map.keySet());
-
+
// the sort method is efficient for linked list
Collections.sort(unmergedExtents);
-
+
while (unmergedExtents.size() + mergedExtents.size() > maxSplits) {
if (unmergedExtents.size() >= 2) {
KeyExtent first = unmergedExtents.removeFirst();
@@ -1144,15 +1144,15 @@ public class TableOperationsImpl extends TableOperationsHelper {
unmergedExtents.addAll(mergedExtents);
mergedExtents.clear();
}
-
+
}
-
+
mergedExtents.addAll(unmergedExtents);
-
+
Set<Range> ranges = new HashSet<Range>();
for (KeyExtent k : mergedExtents)
ranges.add(k.toDataRange().clip(range));
-
+
return ranges;
}
@@ -1191,14 +1191,14 @@ public class TableOperationsImpl extends TableOperationsHelper {
public void importDirectory(String tableName, String dir, String failureDir, boolean setTime) throws IOException, AccumuloSecurityException,
TableNotFoundException, AccumuloException {
ArgumentChecker.notNull(tableName, dir, failureDir);
-
+
Path dirPath = checkPath(dir, "Bulk", "");
Path failPath = checkPath(failureDir, "Bulk", "failure");
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(dirPath.toString().getBytes()),
ByteBuffer.wrap(failPath.toString().getBytes()), ByteBuffer.wrap((setTime + "").getBytes()));
Map<String,String> opts = new HashMap<String,String>();
-
+
try {
doTableOperation(TableOperation.BULK_IMPORT, args, opts);
} catch (TableExistsException e) {
@@ -1208,10 +1208,10 @@ public class TableOperationsImpl extends TableOperationsHelper {
// return new BulkImportHelper(instance, credentials, tableName).importDirectory(new Path(dir), new Path(failureDir), numThreads, numAssignThreads,
// disableGC);
}
-
+
private void waitForTableStateTransition(String tableId, TableState expectedState) throws AccumuloException, TableNotFoundException,
AccumuloSecurityException {
-
+
Text startRow = null;
Text lastRow = null;
@@ -1225,12 +1225,13 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new AccumuloException("Unexpected table state " + tableId + " " + Tables.getTableState(instance, tableId) + " != " + expectedState);
}
}
-
+
Range range = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
- if (startRow == null || lastRow == null)
+ if (startRow == null || lastRow == null)
range = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
else
range = new Range(startRow, lastRow);
+
String metaTable = MetadataTable.NAME;
if (tableId.equals(MetadataTable.ID))
metaTable = RootTable.NAME;
@@ -1239,83 +1240,84 @@ public class TableOperationsImpl extends TableOperationsHelper {
TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
scanner.setRange(range);
-
+
RowIterator rowIter = new RowIterator(scanner);
-
+
KeyExtent lastExtent = null;
-
+
int total = 0;
int waitFor = 0;
int holes = 0;
Text continueRow = null;
MapCounter<String> serverCounts = new MapCounter<String>();
-
+
while (rowIter.hasNext()) {
Iterator<Entry<Key,Value>> row = rowIter.next();
-
+
total++;
KeyExtent extent = null;
String future = null;
String current = null;
-
+
while (row.hasNext()) {
Entry<Key,Value> entry = row.next();
Key key = entry.getKey();
-
+
if (key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME))
future = entry.getValue().toString();
-
+
if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME))
current = entry.getValue().toString();
-
+
if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key))
extent = new KeyExtent(key.getRow(), entry.getValue());
}
-
+
if ((expectedState == TableState.ONLINE && current == null) || (expectedState == TableState.OFFLINE && (future != null || current != null))) {
if (continueRow == null)
continueRow = extent.getMetadataEntry();
waitFor++;
lastRow = extent.getMetadataEntry();
-
- if(current != null)
+
+ if (current != null)
serverCounts.increment(current, 1);
- if(future != null)
+ if (future != null)
serverCounts.increment(future, 1);
}
-
+
if (!extent.getTableId().toString().equals(tableId)) {
throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
}
-
+
if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
holes++;
}
-
+
lastExtent = extent;
}
-
+
if (continueRow != null) {
startRow = continueRow;
}
-
+
if (holes > 0 || total == 0) {
startRow = null;
lastRow = null;
}
-
+
if (waitFor > 0 || holes > 0 || total == 0) {
long waitTime;
long maxPerServer = 0;
- if(serverCounts.size() > 0){
+ if (serverCounts.size() > 0) {
maxPerServer = Collections.max(serverCounts.values());
waitTime = maxPerServer * 10;
- }else
+ } else
waitTime = waitFor * 10;
waitTime = Math.max(100, waitTime);
waitTime = Math.min(5000, waitTime);
- log.trace("Waiting for " + waitFor + "("+maxPerServer+") tablets, startRow = " + startRow + " lastRow = "+lastRow+", holes=" + holes+" sleeping:"+waitTime+"ms");
+ log.trace("Waiting for " + waitFor + "(" + maxPerServer + ") tablets, startRow = " + startRow + " lastRow = " + lastRow + ", holes=" + holes
+ + " sleeping:" + waitTime + "ms");
UtilWaitThread.sleep(waitTime);
} else {
break;
@@ -1323,11 +1325,12 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
}
+
@Override
public void offline(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
offline(tableName, false);
}
-
+
/**
*
* @param tableName
@@ -1340,28 +1343,28 @@ public class TableOperationsImpl extends TableOperationsHelper {
*/
@Override
public void offline(String tableName, boolean wait) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
-
+
ArgumentChecker.notNull(tableName);
String tableId = Tables.getTableId(instance, tableName);
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes()));
Map<String,String> opts = new HashMap<String,String>();
-
+
try {
doTableOperation(TableOperation.OFFLINE, args, opts);
} catch (TableExistsException e) {
// should not happen
throw new RuntimeException(e);
}
-
- if(wait)
+
+ if (wait)
waitForTableStateTransition(tableId, TableState.OFFLINE);
}
-
+
@Override
public void online(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
online(tableName, false);
}
-
+
/**
*
* @param tableName
@@ -1378,18 +1381,18 @@ public class TableOperationsImpl extends TableOperationsHelper {
String tableId = Tables.getTableId(instance, tableName);
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes()));
Map<String,String> opts = new HashMap<String,String>();
-
+
try {
doTableOperation(TableOperation.ONLINE, args, opts);
} catch (TableExistsException e) {
// should not happen
throw new RuntimeException(e);
}
-
- if(wait)
+
+ if (wait)
waitForTableStateTransition(tableId, TableState.ONLINE);
}
-
+
/**
* Clears the tablet locator cache for a specified table
*
@@ -1404,7 +1407,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
TabletLocator tabLocator = TabletLocator.getLocator(instance, new Text(Tables.getTableId(instance, tableName)));
tabLocator.invalidateCache();
}
-
+
/**
* Get a mapping of table name to internal table id.
*
@@ -1414,7 +1417,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
public Map<String,String> tableIdMap() {
return Tables.getNameToIdMap(instance);
}
-
+
@Override
public Text getMaxRow(String tableName, Authorizations auths, Text startRow, boolean startInclusive, Text endRow, boolean endInclusive)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
@@ -1422,10 +1425,10 @@ public class TableOperationsImpl extends TableOperationsHelper {
Scanner scanner = instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName, auths);
return FindMax.findMax(scanner, startRow, startInclusive, endRow, endInclusive);
}
-
+
@Override
public List<DiskUsage> getDiskUsage(Set<String> tableNames) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-
+
List<TDiskUsage> diskUsages = null;
while (diskUsages == null) {
Pair<String,Client> pair = null;
@@ -1457,18 +1460,18 @@ public class TableOperationsImpl extends TableOperationsHelper {
ServerClient.close(pair.getSecond());
}
}
-
+
List<DiskUsage> finalUsages = new ArrayList<DiskUsage>();
for (TDiskUsage diskUsage : diskUsages) {
finalUsages.add(new DiskUsage(new TreeSet<String>(diskUsage.getTables()), diskUsage.getUsage()));
}
-
+
return finalUsages;
}
-
+
public static Map<String,String> getExportedProps(FileSystem fs, Path path) throws IOException {
HashMap<String,String> props = new HashMap<String,String>();
-
+
ZipInputStream zis = new ZipInputStream(fs.open(path));
try {
ZipEntry zipEntry;
@@ -1480,7 +1483,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
String sa[] = line.split("=", 2);
props.put(sa[0], sa[1]);
}
-
+
break;
}
}
@@ -1489,11 +1492,11 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
return props;
}
-
+
@Override
public void importTable(String tableName, String importDir) throws TableExistsException, AccumuloException, AccumuloSecurityException {
ArgumentChecker.notNull(tableName, importDir);
-
+
try {
importDir = checkPath(importDir, "Table", "").toString();
} catch (IOException e) {
@@ -1503,45 +1506,45 @@ public class TableOperationsImpl extends TableOperationsHelper {
try {
FileSystem fs = new Path(importDir).getFileSystem(CachedConfiguration.getInstance());
Map<String,String> props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE));
-
+
for (String propKey : props.keySet()) {
if (Property.isClassProperty(propKey) && !props.get(propKey).contains(Constants.CORE_PACKAGE_NAME)) {
Logger.getLogger(this.getClass()).info(
"Imported table sets '" + propKey + "' to '" + props.get(propKey) + "'. Ensure this class is on Accumulo classpath.");
}
}
-
+
} catch (IOException ioe) {
Logger.getLogger(this.getClass()).warn("Failed to check if imported table references external java classes : " + ioe.getMessage());
}
-
+
String namespace = Tables.extractNamespace(tableName);
if (!namespaceExists(namespace)) {
String info = "Table namespace not found while importing to table";
throw new RuntimeException(new TableNamespaceNotFoundException(null, namespace, info));
}
-
+
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(importDir.getBytes()));
-
+
Map<String,String> opts = Collections.emptyMap();
-
+
try {
doTableOperation(TableOperation.IMPORT, args, opts);
} catch (TableNotFoundException e1) {
// should not happen
throw new RuntimeException(e1);
}
-
+
}
-
+
@Override
public void exportTable(String tableName, String exportDir) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
ArgumentChecker.notNull(tableName, exportDir);
-
+
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(exportDir.getBytes()));
-
+
Map<String,String> opts = Collections.emptyMap();
-
+
try {
doTableOperation(TableOperation.EXPORT, args, opts);
} catch (TableExistsException e1) {
@@ -1549,12 +1552,12 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new RuntimeException(e1);
}
}
-
+
@Override
public boolean testClassLoad(final String tableName, final String className, final String asTypeName) throws TableNotFoundException, AccumuloException,
AccumuloSecurityException {
ArgumentChecker.notNull(tableName, className, asTypeName);
-
+
try {
return ServerClient.executeRaw(instance, new ClientExecReturn<Boolean,ClientService.Client>() {
@Override
@@ -1578,20 +1581,20 @@ public class TableOperationsImpl extends TableOperationsHelper {
throw new AccumuloException(e);
}
}
-
+
@Override
public void attachIterator(String tableName, IteratorSetting setting, EnumSet<IteratorScope> scopes) throws AccumuloSecurityException, AccumuloException,
TableNotFoundException {
testClassLoad(tableName, setting.getIteratorClass(), SortedKeyValueIterator.class.getName());
super.attachIterator(tableName, setting, scopes);
}
-
+
@Override
public int addConstraint(String tableName, String constraintClassName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
testClassLoad(tableName, constraintClassName, Constraint.class.getName());
return super.addConstraint(tableName, constraintClassName);
}
-
+
private boolean namespaceExists(String namespace) {
return TableNamespaces.getNameToIdMap(instance).containsKey(namespace);
}