You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2017/09/12 19:57:59 UTC
[accumulo] 01/02: Merge branch '1.7' into 1.8
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch 1.8
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 862753270d5f729445478ad0e95995903f36083c
Merge: 9c4858f b0016c3
Author: Mike Miller <mm...@apache.org>
AuthorDate: Tue Sep 12 15:07:16 2017 -0400
Merge branch '1.7' into 1.8
.../core/client/impl/ConditionalWriterImpl.java | 3 +-
.../core/client/impl/InstanceOperationsImpl.java | 3 +-
.../accumulo/core/client/impl/MasterClient.java | 3 +-
.../core/client/impl/ReplicationClient.java | 3 +-
.../core/client/impl/TableOperationsImpl.java | 2 +-
.../impl/TabletServerBatchReaderIterator.java | 3 +-
.../core/client/impl/TabletServerBatchWriter.java | 2 +-
.../accumulo/core/client/impl/ThriftScanner.java | 3 +-
.../core/client/impl/ThriftTransportKey.java | 2 +-
.../core/client/impl/ThriftTransportPool.java | 2 +-
.../apache/accumulo/core/client/impl/Writer.java | 3 +-
.../accumulo/core/rpc/TTimeoutTransport.java | 5 +-
.../org/apache/accumulo/core/rpc/ThriftUtil.java | 13 +-
.../org/apache/accumulo/core/util/AddressUtil.java | 2 +-
.../org/apache/accumulo/core/util/HostAndPort.java | 270 +++++++++++++++++++++
.../apache/accumulo/core/util/ServerServices.java | 2 +-
.../core/client/impl/ThriftTransportKeyTest.java | 3 +-
.../main/java/org/apache/accumulo/proxy/Proxy.java | 2 +-
.../accumulo/server/client/BulkImporter.java | 3 +-
.../accumulo/server/master/LiveTServerSet.java | 3 +-
.../server/master/state/TServerInstance.java | 5 +-
.../server/master/state/ZooTabletStateStore.java | 3 +-
.../apache/accumulo/server/rpc/ServerAddress.java | 3 +-
.../apache/accumulo/server/rpc/TServerUtils.java | 21 +-
.../org/apache/accumulo/server/util/Admin.java | 2 +-
.../apache/accumulo/server/util/LocalityCheck.java | 5 +-
.../server/util/VerifyTabletAssignments.java | 2 +-
.../server/watcher/MonitorLog4jWatcher.java | 5 +-
.../org/apache/accumulo/server/AccumuloTest.java | 3 +-
.../accumulo/server/master/LiveTServerSetTest.java | 3 +-
.../master/balancer/ChaoticLoadBalancerTest.java | 3 +-
.../master/balancer/DefaultLoadBalancerTest.java | 3 +-
.../master/balancer/TableLoadBalancerTest.java | 2 +-
.../apache/accumulo/gc/SimpleGarbageCollector.java | 5 +-
.../replication/CloseWriteAheadLogReferences.java | 2 +-
.../apache/accumulo/master/tableOps/LoadFiles.java | 3 +-
.../MasterReplicationCoordinatorTest.java | 3 +-
.../master/state/RootTabletStateStoreTest.java | 3 +-
.../java/org/apache/accumulo/monitor/Monitor.java | 3 +-
.../apache/accumulo/monitor/ZooKeeperStatus.java | 3 +-
.../accumulo/monitor/servlets/MasterServlet.java | 6 +-
.../accumulo/monitor/servlets/ScanServlet.java | 3 +-
.../accumulo/monitor/servlets/TServersServlet.java | 5 +-
.../org/apache/accumulo/tserver/TabletServer.java | 11 +-
.../tserver/replication/AccumuloReplicaSystem.java | 3 +-
.../apache/accumulo/tserver/tablet/TabletTest.java | 5 +-
.../org/apache/accumulo/test/TotalQueuedIT.java | 3 +-
.../org/apache/accumulo/test/WrongTabletTest.java | 2 +-
.../test/functional/WatchTheWatchCountIT.java | 5 +-
.../accumulo/test/functional/ZombieTServer.java | 3 +-
.../apache/accumulo/test/master/MergeStateIT.java | 3 +-
.../metadata/MetadataBatchScanTest.java | 2 +-
.../test/performance/scan/CollectTabletStats.java | 4 +-
.../test/performance/thrift/NullTserver.java | 2 +-
.../accumulo/test/proxy/ProxyDurabilityIT.java | 2 +-
.../accumulo/test/proxy/SimpleProxyBase.java | 2 +-
.../test/proxy/TestProxyInstanceOperations.java | 3 +-
.../accumulo/test/proxy/TestProxyReadWrite.java | 3 +-
.../test/proxy/TestProxySecurityOperations.java | 3 +-
.../test/proxy/TestProxyTableOperations.java | 3 +-
...GarbageCollectorCommunicatesWithTServersIT.java | 3 +-
.../replication/MultiTserverReplicationIT.java | 2 +-
62 files changed, 362 insertions(+), 130 deletions(-)
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 98a15ca,4812378..defcfd4
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@@ -77,7 -76,9 +77,8 @@@ import org.apache.accumulo.core.trace.T
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.BadArgumentException;
import org.apache.accumulo.core.util.ByteBufferUtil;
+ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
index a780846,79ad2dc..622e671
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
@@@ -31,6 -29,8 +31,7 @@@ import org.apache.accumulo.core.client.
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.rpc.ThriftUtil;
+ import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
index dfbb2cc,0c3e0e6..353cb58
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
@@@ -121,11 -114,10 +122,10 @@@ import org.apache.hadoop.io.Text
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
- import com.google.common.net.HostAndPort;
public class TableOperationsImpl extends TableOperationsHelper {
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
index 8796d3c,f9295b1..cce187b
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@@ -64,20 -60,20 +64,19 @@@ import org.apache.accumulo.core.tablets
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.util.ByteBufferUtil;
+ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.OpTimer;
-import org.apache.hadoop.io.Text;
import org.apache.htrace.wrappers.TraceRunnable;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- import com.google.common.net.HostAndPort;
-
public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>> {
- private static final Logger log = Logger.getLogger(TabletServerBatchReaderIterator.class);
+ private static final Logger log = LoggerFactory.getLogger(TabletServerBatchReaderIterator.class);
private final ClientContext context;
private final Instance instance;
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index 7685d0f,26a406e..2153dff
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@@ -69,7 -69,9 +69,8 @@@ import org.apache.accumulo.core.trace.S
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.trace.thrift.TInfo;
+ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.SimpleThreadPool;
-import org.apache.hadoop.io.Text;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index be98327,38c8cb4..4f38e0c
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@@ -64,17 -59,16 +64,16 @@@ import org.apache.accumulo.core.trace.S
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.trace.thrift.TInfo;
+ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.hadoop.io.Text;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- import com.google.common.net.HostAndPort;
-
public class ThriftScanner {
- private static final Logger log = Logger.getLogger(ThriftScanner.class);
+ private static final Logger log = LoggerFactory.getLogger(ThriftScanner.class);
public static final Map<TabletType,Set<String>> serversWaitedForWrites = new EnumMap<>(TabletType.class);
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java
index 90691ef,dc7650f..3adf45c
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java
@@@ -35,6 -32,8 +35,7 @@@ import org.apache.accumulo.core.tablets
import org.apache.accumulo.core.tabletserver.thrift.TDurability;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Tracer;
+ import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
diff --cc server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index c9af520,f52e52e..cfc2b40
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@@ -58,8 -56,10 +58,9 @@@ import org.apache.accumulo.core.rpc.Thr
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.util.CachedConfiguration;
+ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.StopWatch;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index 148b6cc,bbf7aab..0816145
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@@ -28,8 -26,8 +28,9 @@@ import java.util.Map
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 8d28b9e,833bfdc..dc683e8
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@@ -38,7 -40,9 +38,8 @@@ import org.apache.accumulo.core.rpc.Ssl
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
import org.apache.accumulo.core.util.Daemon;
+ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.SimpleThreadPool;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.util.Halt;
@@@ -399,10 -391,9 +398,10 @@@ public class TServerUtils
String hostname, fqdn;
try {
- hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
+ hostname = InetAddress.getByName(address.getHost()).getCanonicalHostName();
fqdn = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
+ transport.close();
throw new TTransportException(e);
}
diff --cc server/base/src/test/java/org/apache/accumulo/server/master/balancer/TableLoadBalancerTest.java
index f34cc3d,feef108..95ece35
--- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/TableLoadBalancerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/TableLoadBalancerTest.java
@@@ -45,13 -46,8 +46,12 @@@ import org.easymock.EasyMock
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.ImmutableMap;
- import com.google.common.net.HostAndPort;
+
public class TableLoadBalancerTest {
+ private static Map<String,String> TABLE_ID_MAP = ImmutableMap.of("t1", "a1", "t2", "b12", "t3", "c4");
+
static private TServerInstance mkts(String address, String session) throws Exception {
return new TServerInstance(HostAndPort.fromParts(address, 1234), session);
}
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 76292c8,c1972a3..eef856c
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -21,6 -19,8 +21,7 @@@ import static com.google.common.util.co
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.UnknownHostException;
+ import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 0c09396,4ac8e5c..5c5ef16
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@@ -43,14 -46,13 +43,15 @@@ import org.apache.accumulo.core.securit
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
-import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.trace.thrift.TInfo;
+ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
@@@ -59,7 -61,9 +60,6 @@@ import org.slf4j.Logger
import org.slf4j.LoggerFactory;
import com.google.common.base.Stopwatch;
- import com.google.common.net.HostAndPort;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import com.google.protobuf.InvalidProtocolBufferException;
/**
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
index ff285fa,276f141..6c4e493
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
@@@ -38,12 -35,14 +38,13 @@@ import org.apache.accumulo.core.client.
import org.apache.accumulo.core.client.impl.thrift.ClientService;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
-import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.trace.Tracer;
+ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.SimpleThreadPool;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.fs.VolumeManager;
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java
index 62f872f,97f9ee7..49a3178
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java
@@@ -28,6 -26,8 +28,7 @@@ import java.util.concurrent.TimeUnit
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.rpc.TTimeoutTransport;
+ import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1deb1cc,b4cbf03..6eaea59
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -261,10 -247,7 +262,8 @@@ import org.apache.zookeeper.KeeperExcep
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- import com.google.common.net.HostAndPort;
-
public class TabletServer extends AccumuloServerContext implements Runnable {
+
private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 7000eb3,28eca21..18c5593
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@@ -62,6 -60,8 +62,7 @@@ import org.apache.accumulo.core.securit
import org.apache.accumulo.core.trace.ProbabilitySampler;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
+ import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager;
diff --cc test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
index ea3f680,0000000..8c2f8c9
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
@@@ -1,131 -1,0 +1,130 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
- import com.google.common.net.HostAndPort;
-
+// see ACCUMULO-1950
+public class TotalQueuedIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ cfg.setDefaultMemory(cfg.getDefaultMemory() * 2, MemoryUnit.BYTE);
+ cfg.useMiniDFS();
+ }
+
+ int SMALL_QUEUE_SIZE = 100000;
+ int LARGE_QUEUE_SIZE = SMALL_QUEUE_SIZE * 10;
+ static final long N = 1000000;
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void test() throws Exception {
+ Random random = new Random();
+ Connector c = getConnector();
+ c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + SMALL_QUEUE_SIZE);
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "9999");
+ c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "999");
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ // get an idea of how fast the syncs occur
+ byte row[] = new byte[250];
+ BatchWriterConfig cfg = new BatchWriterConfig();
+ cfg.setMaxWriteThreads(10);
+ cfg.setMaxLatency(1, TimeUnit.SECONDS);
+ cfg.setMaxMemory(1024 * 1024);
+ long realSyncs = getSyncs();
+ BatchWriter bw = c.createBatchWriter(tableName, cfg);
+ long now = System.currentTimeMillis();
+ long bytesSent = 0;
+ for (int i = 0; i < N; i++) {
+ random.nextBytes(row);
+ Mutation m = new Mutation(row);
+ m.put("", "", "");
+ bw.addMutation(m);
+ bytesSent += m.estimatedMemoryUsed();
+ }
+ bw.close();
+ long diff = System.currentTimeMillis() - now;
+ double secs = diff / 1000.;
+ double syncs = bytesSent / SMALL_QUEUE_SIZE;
+ double syncsPerSec = syncs / secs;
+ System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long) syncs), syncsPerSec));
+ long update = getSyncs();
+ System.out.println("Syncs " + (update - realSyncs));
+ realSyncs = update;
+
+ // Now with a much bigger total queue
+ c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + LARGE_QUEUE_SIZE);
+ c.tableOperations().flush(tableName, null, null, true);
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ bw = c.createBatchWriter(tableName, cfg);
+ now = System.currentTimeMillis();
+ bytesSent = 0;
+ for (int i = 0; i < N; i++) {
+ random.nextBytes(row);
+ Mutation m = new Mutation(row);
+ m.put("", "", "");
+ bw.addMutation(m);
+ bytesSent += m.estimatedMemoryUsed();
+ }
+ bw.close();
+ diff = System.currentTimeMillis() - now;
+ secs = diff / 1000.;
+ syncs = bytesSent / LARGE_QUEUE_SIZE;
+ syncsPerSec = syncs / secs;
+ System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long) syncs), syncsPerSec));
+ update = getSyncs();
+ System.out.println("Syncs " + (update - realSyncs));
+ assertTrue(update - realSyncs < realSyncs);
+ }
+
+ private long getSyncs() throws Exception {
+ Connector c = getConnector();
+ ServerConfigurationFactory confFactory = new ServerConfigurationFactory(c.getInstance());
+ AccumuloServerContext context = new AccumuloServerContext(confFactory);
+ for (String address : c.instanceOperations().getTabletServers()) {
+ TabletClientService.Client client = ThriftUtil.getTServerClient(HostAndPort.fromString(address), context);
+ TabletServerStatus status = client.getTabletServerStatus(null, context.rpcCreds());
+ return status.syncs;
+ }
+ return 0;
+ }
+
+}
diff --cc test/src/main/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
index 84b944b,0000000..fc0937f
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
@@@ -1,81 -1,0 +1,80 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertTrue;
+
+import java.net.Socket;
+
+import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
- import com.google.common.net.HostAndPort;
-
+// ACCUMULO-2757 - make sure we don't make too many more watchers
+public class WatchTheWatchCountIT extends ConfigurableMacBase {
+ private static final Logger log = LoggerFactory.getLogger(WatchTheWatchCountIT.class);
+
+ public int defaultOverrideSeconds() {
+ return 60;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(3);
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = getConnector();
+ String[] tableNames = getUniqueNames(3);
+ for (String tableName : tableNames) {
+ c.tableOperations().create(tableName);
+ }
+ c.tableOperations().list();
+ String zooKeepers = c.getInstance().getZooKeepers();
+ final long MIN = 475L;
+ final long MAX = 700L;
+ long total = 0;
+ final HostAndPort hostAndPort = HostAndPort.fromString(zooKeepers);
+ for (int i = 0; i < 5; i++) {
- Socket socket = new Socket(hostAndPort.getHostText(), hostAndPort.getPort());
++ Socket socket = new Socket(hostAndPort.getHost(), hostAndPort.getPort());
+ try {
+ socket.getOutputStream().write("wchs\n".getBytes(), 0, 5);
+ byte[] buffer = new byte[1024];
+ int n = socket.getInputStream().read(buffer);
+ String response = new String(buffer, 0, n);
+ total = Long.parseLong(response.split(":")[1].trim());
+ log.info("Total: {}", total);
+ if (total > MIN && total < MAX) {
+ break;
+ }
+ log.debug("Expected number of watchers to be contained in ({}, {}), but actually was {}. Sleeping and retrying", MIN, MAX, total);
+ Thread.sleep(5000);
+ } finally {
+ socket.close();
+ }
+ }
+
+ assertTrue("Expected number of watchers to be contained in (" + MIN + ", " + MAX + "), but actually was " + total, total > MIN && total < MAX);
+
+ }
+
+}
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 6c20cda,d2f0216..04cd3a4
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@@ -31,8 -29,10 +31,9 @@@ import org.apache.accumulo.core.tablets
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.trace.thrift.TInfo;
+ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
diff --cc test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
index 2d233c4,0000000..f7842e0
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
@@@ -1,204 -1,0 +1,203 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.master;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.master.thrift.MasterState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.master.state.MergeStats;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
- import com.google.common.net.HostAndPort;
-
+public class MergeStateIT extends ConfigurableMacBase {
+
+ private static class MockCurrentState implements CurrentState {
+
+ TServerInstance someTServer = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), 0x123456);
+ MergeInfo mergeInfo;
+
+ MockCurrentState(MergeInfo info) {
+ this.mergeInfo = info;
+ }
+
+ @Override
+ public Set<String> onlineTables() {
+ return Collections.singleton("t");
+ }
+
+ @Override
+ public Set<TServerInstance> onlineTabletServers() {
+ return Collections.singleton(someTServer);
+ }
+
+ @Override
+ public Collection<MergeInfo> merges() {
+ return Collections.singleton(mergeInfo);
+ }
+
+ @Override
+ public Set<KeyExtent> migrationsSnapshot() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public MasterState getMasterState() {
+ return MasterState.NORMAL;
+ }
+
+ @Override
+ public Set<TServerInstance> shutdownServers() {
+ return Collections.emptySet();
+ }
+ }
+
+ private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException {
+ BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ bw.addMutation(m);
+ bw.close();
+ }
+
+ @Test
+ public void test() throws Exception {
+ AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+ Connector connector = getConnector();
+ EasyMock.expect(context.getConnector()).andReturn(connector).anyTimes();
+ EasyMock.replay(context);
+ connector.securityOperations().grantTablePermission(connector.whoami(), MetadataTable.NAME, TablePermission.WRITE);
+ BatchWriter bw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+ // Create a fake METADATA table with these splits
+ String splits[] = {"a", "e", "j", "o", "t", "z"};
+ // create metadata for a table "t" with the splits above
+ String tableId = "t";
+ Text pr = null;
+ for (String s : splits) {
+ Text split = new Text(s);
+ Mutation prevRow = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, split, pr));
+ prevRow.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
+ ChoppedColumnFamily.CHOPPED_COLUMN.put(prevRow, new Value("junk".getBytes()));
+ bw.addMutation(prevRow);
+ pr = split;
+ }
+ // Add the default tablet
+ Mutation defaultTablet = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, null, pr));
+ defaultTablet.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
+ bw.addMutation(defaultTablet);
+ bw.close();
+
+ // Read out the TabletLocationStates
+ MockCurrentState state = new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new Text("p"), new Text("e")), MergeInfo.Operation.MERGE));
+
+ // Verify the tablet state: hosted, and count
+ MetaDataStateStore metaDataStateStore = new MetaDataStateStore(context, state);
+ int count = 0;
+ for (TabletLocationState tss : metaDataStateStore) {
+ if (tss != null)
+ count++;
+ }
+ Assert.assertEquals(0, count); // the normal case is to skip tablets in a good state
+
+ // Create the hole
+ // Split the tablet at one end of the range
+ Mutation m = new KeyExtent(tableId, new Text("t"), new Text("p")).getPrevRowUpdateMutation();
+ TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
+ TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(new Text("o")));
+ update(connector, m);
+
+ // do the state check
+ MergeStats stats = scan(state, metaDataStateStore);
+ MergeState newState = stats.nextMergeState(connector, state);
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
+
+ // unassign the tablets
+ BatchDeleter deleter = connector.createBatchDeleter(MetadataTable.NAME, Authorizations.EMPTY, 1000, new BatchWriterConfig());
+ deleter.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+ deleter.setRanges(Collections.singletonList(new Range()));
+ deleter.delete();
+
+ // now we should be ready to merge but, we have inconsistent metadata
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
+
+ // finish the split
+ KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o"));
+ m = tablet.getPrevRowUpdateMutation();
+ TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
+ update(connector, m);
+ metaDataStateStore.setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer)));
+
+ // onos... there's a new tablet online
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(connector, state));
+
+ // chop it
+ m = tablet.getPrevRowUpdateMutation();
+ ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("junk".getBytes()));
+ update(connector, m);
+
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
+
+ // take it offline
+ m = tablet.getPrevRowUpdateMutation();
+ Collection<Collection<String>> walogs = Collections.emptyList();
+ metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, null, walogs, false)), null);
+
+ // now we can split
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.MERGING, stats.nextMergeState(connector, state));
+
+ }
+
+ private MergeStats scan(MockCurrentState state, MetaDataStateStore metaDataStateStore) {
+ MergeStats stats = new MergeStats(state.mergeInfo);
+ stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE);
+ for (TabletLocationState tss : metaDataStateStore) {
+ stats.update(tss.extent, tss.getState(state.onlineTabletServers()), tss.chopped, false);
+ }
+ return stats;
+ }
+}
diff --cc test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 05a0c54,7bf9f13..cb2af2a
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@@ -76,11 -75,9 +76,11 @@@ import org.apache.accumulo.server.zooke
import org.apache.thrift.TException;
import com.beust.jcommander.Parameter;
- import com.google.common.net.HostAndPort;
+ import org.apache.accumulo.core.util.HostAndPort;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+
/**
* The purpose of this class is to server as fake tserver that is a data sink like /dev/null. NullTserver modifies the metadata location entries for a table to
* point to it. This allows thrift performance to be measured by running any client code that writes to a table.
diff --cc test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java
index c1115ab,0000000..fd8ad83
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java
@@@ -1,146 -1,0 +1,146 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.accumulo.proxy.thrift.AccumuloProxy.Client;
+import org.apache.accumulo.proxy.thrift.Column;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.Condition;
+import org.apache.accumulo.proxy.thrift.ConditionalStatus;
+import org.apache.accumulo.proxy.thrift.ConditionalUpdates;
+import org.apache.accumulo.proxy.thrift.ConditionalWriterOptions;
+import org.apache.accumulo.proxy.thrift.Durability;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.accumulo.proxy.thrift.WriterOptions;
+import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
- import com.google.common.net.HostAndPort;
+
+public class ProxyDurabilityIT extends ConfigurableMacBase {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 120;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setNumTservers(1);
+ }
+
+ private static ByteBuffer bytes(String value) {
+ return ByteBuffer.wrap(value.getBytes());
+ }
+
+ @Test
+ public void testDurability() throws Exception {
+ Connector c = getConnector();
+ Properties props = new Properties();
+ // Avoid issues with locally installed client configuration files with custom properties
+ File emptyFile = Files.createTempFile(null, null).toFile();
+ emptyFile.deleteOnExit();
+ props.put("instance", c.getInstance().getInstanceName());
+ props.put("zookeepers", c.getInstance().getZooKeepers());
+ props.put("tokenClass", PasswordToken.class.getName());
+ props.put("clientConfigurationFile", emptyFile.toString());
+
+ TJSONProtocol.Factory protocol = new TJSONProtocol.Factory();
+
+ int proxyPort = PortUtils.getRandomFreePort();
+ final TServer proxyServer = Proxy.createProxyServer(HostAndPort.fromParts("localhost", proxyPort), protocol, props).server;
+ while (!proxyServer.isServing())
+ sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ Client client = new TestProxyClient("localhost", proxyPort, protocol).proxy();
+ Map<String,String> properties = new TreeMap<>();
+ properties.put("password", ROOT_PASSWORD);
+ ByteBuffer login = client.login("root", properties);
+
+ String tableName = getUniqueNames(1)[0];
+ client.createTable(login, tableName, true, TimeType.MILLIS);
+ assertTrue(c.tableOperations().exists(tableName));
+
+ WriterOptions options = new WriterOptions();
+ options.setDurability(Durability.NONE);
+ String writer = client.createWriter(login, tableName, options);
+ Map<ByteBuffer,List<ColumnUpdate>> cells = new TreeMap<>();
+ ColumnUpdate column = new ColumnUpdate(bytes("cf"), bytes("cq"));
+ column.setValue("value".getBytes());
+ cells.put(bytes("row"), Collections.singletonList(column));
+ client.update(writer, cells);
+ client.closeWriter(writer);
+ assertEquals(1, count(tableName));
+ restartTServer();
+ assertEquals(0, count(tableName));
+
+ ConditionalWriterOptions cfg = new ConditionalWriterOptions();
+ cfg.setDurability(Durability.SYNC);
+ String cwriter = client.createConditionalWriter(login, tableName, cfg);
+ ConditionalUpdates updates = new ConditionalUpdates();
+ updates.addToConditions(new Condition(new Column(bytes("cf"), bytes("cq"), bytes(""))));
+ updates.addToUpdates(column);
+ Map<ByteBuffer,ConditionalStatus> status = client.updateRowsConditionally(cwriter, Collections.singletonMap(bytes("row"), updates));
+ assertEquals(ConditionalStatus.ACCEPTED, status.get(bytes("row")));
+ assertEquals(1, count(tableName));
+ restartTServer();
+ assertEquals(1, count(tableName));
+
+ proxyServer.stop();
+ }
+
+ private void restartTServer() throws Exception {
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.start();
+ }
+
+ private int count(String tableName) throws Exception {
+ return Iterators.size((getConnector().createScanner(tableName, Authorizations.EMPTY)).iterator());
+ }
+
+}
diff --cc test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
index 5bb4ad6,0000000..c8f311b
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
@@@ -1,2709 -1,0 +1,2709 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.cluster.ClusterUser;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.Namespaces;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.DebugIterator;
+import org.apache.accumulo.core.iterators.DevNull;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.examples.simple.constraints.MaxMutationSize;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.examples.simple.constraints.NumericValueConstraint;
+import org.apache.accumulo.harness.MiniClusterHarness;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.harness.TestingKdc;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.accumulo.proxy.thrift.AccumuloProxy.Client;
+import org.apache.accumulo.proxy.thrift.AccumuloSecurityException;
+import org.apache.accumulo.proxy.thrift.ActiveCompaction;
+import org.apache.accumulo.proxy.thrift.ActiveScan;
+import org.apache.accumulo.proxy.thrift.BatchScanOptions;
+import org.apache.accumulo.proxy.thrift.Column;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.CompactionReason;
+import org.apache.accumulo.proxy.thrift.CompactionStrategyConfig;
+import org.apache.accumulo.proxy.thrift.CompactionType;
+import org.apache.accumulo.proxy.thrift.Condition;
+import org.apache.accumulo.proxy.thrift.ConditionalStatus;
+import org.apache.accumulo.proxy.thrift.ConditionalUpdates;
+import org.apache.accumulo.proxy.thrift.ConditionalWriterOptions;
+import org.apache.accumulo.proxy.thrift.DiskUsage;
+import org.apache.accumulo.proxy.thrift.IteratorScope;
+import org.apache.accumulo.proxy.thrift.IteratorSetting;
+import org.apache.accumulo.proxy.thrift.Key;
+import org.apache.accumulo.proxy.thrift.KeyValue;
+import org.apache.accumulo.proxy.thrift.MutationsRejectedException;
+import org.apache.accumulo.proxy.thrift.NamespaceExistsException;
+import org.apache.accumulo.proxy.thrift.NamespaceNotEmptyException;
+import org.apache.accumulo.proxy.thrift.NamespaceNotFoundException;
+import org.apache.accumulo.proxy.thrift.NamespacePermission;
+import org.apache.accumulo.proxy.thrift.PartialKey;
+import org.apache.accumulo.proxy.thrift.Range;
+import org.apache.accumulo.proxy.thrift.ScanColumn;
+import org.apache.accumulo.proxy.thrift.ScanOptions;
+import org.apache.accumulo.proxy.thrift.ScanResult;
+import org.apache.accumulo.proxy.thrift.ScanState;
+import org.apache.accumulo.proxy.thrift.ScanType;
+import org.apache.accumulo.proxy.thrift.SystemPermission;
+import org.apache.accumulo.proxy.thrift.TableExistsException;
+import org.apache.accumulo.proxy.thrift.TableNotFoundException;
+import org.apache.accumulo.proxy.thrift.TablePermission;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.accumulo.proxy.thrift.UnknownScanner;
+import org.apache.accumulo.proxy.thrift.UnknownWriter;
+import org.apache.accumulo.proxy.thrift.WriterOptions;
+import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
- import com.google.common.net.HostAndPort;
+
+/**
+ * Call every method on the proxy and try to verify that it works.
+ */
+public abstract class SimpleProxyBase extends SharedMiniClusterBase {
+ private static final Logger log = LoggerFactory.getLogger(SimpleProxyBase.class);
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ private static final long ZOOKEEPER_PROPAGATION_TIME = 10 * 1000;
+ private static TServer proxyServer;
+ private static int proxyPort;
+
+ private TestProxyClient proxyClient;
+ private org.apache.accumulo.proxy.thrift.AccumuloProxy.Client client;
+
+ private static Map<String,String> properties = new HashMap<>();
+ private static String hostname, proxyPrincipal, proxyPrimary, clientPrincipal;
+ private static File proxyKeytab, clientKeytab;
+
+ private ByteBuffer creds = null;
+
+ // Implementations can set this
+ static TProtocolFactory factory = null;
+
+ private static void waitForAccumulo(Connector c) throws Exception {
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+ }
+
+ private static boolean isKerberosEnabled() {
+ return SharedMiniClusterBase.TRUE.equals(System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION));
+ }
+
+ /**
+ * Does the actual test setup, invoked by the concrete test class
+ */
+ public static void setUpProxy() throws Exception {
+ assertNotNull("Implementations must initialize the TProtocolFactory", factory);
+
+ Connector c = SharedMiniClusterBase.getConnector();
+ Instance inst = c.getInstance();
+ waitForAccumulo(c);
+
+ hostname = InetAddress.getLocalHost().getCanonicalHostName();
+
+ Properties props = new Properties();
+ props.put("instance", inst.getInstanceName());
+ props.put("zookeepers", inst.getZooKeepers());
+
+ final String tokenClass;
+ if (isKerberosEnabled()) {
+ tokenClass = KerberosToken.class.getName();
+ TestingKdc kdc = getKdc();
+
+ // Create a principal+keytab for the proxy
+ proxyKeytab = new File(kdc.getKeytabDir(), "proxy.keytab");
+ hostname = InetAddress.getLocalHost().getCanonicalHostName();
+ // Set the primary because the client needs to know it
+ proxyPrimary = "proxy";
+ // Qualify with an instance
+ proxyPrincipal = proxyPrimary + "/" + hostname;
+ kdc.createPrincipal(proxyKeytab, proxyPrincipal);
+ // Tack on the realm too
+ proxyPrincipal = kdc.qualifyUser(proxyPrincipal);
+
+ props.setProperty("kerberosPrincipal", proxyPrincipal);
+ props.setProperty("kerberosKeytab", proxyKeytab.getCanonicalPath());
+ props.setProperty("thriftServerType", "sasl");
+
+ // Enabled kerberos auth
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+
+ // Login for the Proxy itself
+ UserGroupInformation.loginUserFromKeytab(proxyPrincipal, proxyKeytab.getAbsolutePath());
+
+ // User for tests
+ ClusterUser user = kdc.getRootUser();
+ clientPrincipal = user.getPrincipal();
+ clientKeytab = user.getKeytab();
+ } else {
+ clientPrincipal = "root";
+ tokenClass = PasswordToken.class.getName();
+ properties.put("password", SharedMiniClusterBase.getRootPassword());
+ hostname = "localhost";
+ }
+
+ props.put("tokenClass", tokenClass);
+
+ ClientConfiguration clientConfig = SharedMiniClusterBase.getCluster().getClientConfig();
+ String clientConfPath = new File(SharedMiniClusterBase.getCluster().getConfig().getConfDir(), "client.conf").getAbsolutePath();
+ props.put("clientConfigurationFile", clientConfPath);
+ properties.put("clientConfigurationFile", clientConfPath);
+
+ proxyPort = PortUtils.getRandomFreePort();
+ proxyServer = Proxy.createProxyServer(HostAndPort.fromParts(hostname, proxyPort), factory, props, clientConfig).server;
+ while (!proxyServer.isServing())
+ sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+
+ @AfterClass
+ public static void tearDownProxy() throws Exception {
+ if (null != proxyServer) {
+ proxyServer.stop();
+ }
+
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ final IteratorSetting setting = new IteratorSetting(100, "slow", SlowIterator.class.getName(), Collections.singletonMap("sleepTime", "200"));
+ String tableName;
+ String namespaceName;
+ ByteBuffer badLogin;
+
+ @Before
+ public void setup() throws Exception {
+ // Create a new client for each test
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ proxyClient = new TestProxyClient(hostname, proxyPort, factory, proxyPrimary, UserGroupInformation.getCurrentUser());
+ client = proxyClient.proxy();
+ creds = client.login(clientPrincipal, properties);
+
+ TestingKdc kdc = getKdc();
+ final ClusterUser user = kdc.getClientPrincipal(0);
+ // Create another user
+ client.createLocalUser(creds, user.getPrincipal(), s2bb("unused"));
+ // Login in as that user we just created
+ UserGroupInformation.loginUserFromKeytab(user.getPrincipal(), user.getKeytab().getAbsolutePath());
+ final UserGroupInformation badUgi = UserGroupInformation.getCurrentUser();
+ // Get a "Credentials" object for the proxy
+ TestProxyClient badClient = new TestProxyClient(hostname, proxyPort, factory, proxyPrimary, badUgi);
+ try {
+ Client badProxy = badClient.proxy();
+ badLogin = badProxy.login(user.getPrincipal(), properties);
+ } finally {
+ badClient.close();
+ }
+
+ // Log back in as the test user
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ // Drop test user, invalidating the credentials (not to mention not having the krb credentials anymore)
+ client.dropLocalUser(creds, user.getPrincipal());
+ } else {
+ proxyClient = new TestProxyClient(hostname, proxyPort, factory);
+ client = proxyClient.proxy();
+ creds = client.login("root", properties);
+
+ // Create 'user'
+ client.createLocalUser(creds, "user", s2bb(SharedMiniClusterBase.getRootPassword()));
+ // Log in as 'user'
+ badLogin = client.login("user", properties);
+ // Drop 'user', invalidating the credentials
+ client.dropLocalUser(creds, "user");
+ }
+
+ // Create some unique names for tables, namespaces, etc.
+ String[] uniqueNames = getUniqueNames(2);
+
+ // Create a general table to be used
+ tableName = uniqueNames[0];
+ client.createTable(creds, tableName, true, TimeType.MILLIS);
+
+ // Create a general namespace to be used
+ namespaceName = uniqueNames[1];
+ client.createNamespace(creds, namespaceName);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (null != tableName) {
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ }
+ try {
+ if (client.tableExists(creds, tableName)) {
+ client.deleteTable(creds, tableName);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to delete test table", e);
+ }
+ }
+
+ if (null != namespaceName) {
+ try {
+ if (client.namespaceExists(creds, namespaceName)) {
+ client.deleteNamespace(creds, namespaceName);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to delete test namespace", e);
+ }
+ }
+
+ // Close the transport after the test
+ if (null != proxyClient) {
+ proxyClient.close();
+ }
+ }
+
+ /*
+ * Set a lower timeout for tests that should fail fast
+ */
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void addConstraintLoginFailure() throws Exception {
+ client.addConstraint(badLogin, tableName, NumericValueConstraint.class.getName());
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void addSplitsLoginFailure() throws Exception {
+ client.addSplits(badLogin, tableName, Collections.singleton(s2bb("1")));
+ }
+
+ @Test(expected = TApplicationException.class, timeout = 5000)
+ public void clearLocatorCacheLoginFailure() throws Exception {
+ client.clearLocatorCache(badLogin, tableName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void compactTableLoginFailure() throws Exception {
+ client.compactTable(badLogin, tableName, null, null, null, true, false, null);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void cancelCompactionLoginFailure() throws Exception {
+ client.cancelCompaction(badLogin, tableName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void createTableLoginFailure() throws Exception {
+ client.createTable(badLogin, tableName, false, TimeType.MILLIS);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void deleteTableLoginFailure() throws Exception {
+ client.deleteTable(badLogin, tableName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void deleteRowsLoginFailure() throws Exception {
+ client.deleteRows(badLogin, tableName, null, null);
+ }
+
+ @Test(expected = TApplicationException.class, timeout = 5000)
+ public void tableExistsLoginFailure() throws Exception {
+ client.tableExists(badLogin, tableName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void flustTableLoginFailure() throws Exception {
+ client.flushTable(badLogin, tableName, null, null, false);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getLocalityGroupsLoginFailure() throws Exception {
+ client.getLocalityGroups(badLogin, tableName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getMaxRowLoginFailure() throws Exception {
+ client.getMaxRow(badLogin, tableName, Collections.<ByteBuffer> emptySet(), null, false, null, false);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getTablePropertiesLoginFailure() throws Exception {
+ client.getTableProperties(badLogin, tableName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void listSplitsLoginFailure() throws Exception {
+ client.listSplits(badLogin, tableName, 10000);
+ }
+
+ @Test(expected = TApplicationException.class, timeout = 5000)
+ public void listTablesLoginFailure() throws Exception {
+ client.listTables(badLogin);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void listConstraintsLoginFailure() throws Exception {
+ client.listConstraints(badLogin, tableName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void mergeTabletsLoginFailure() throws Exception {
+ client.mergeTablets(badLogin, tableName, null, null);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void offlineTableLoginFailure() throws Exception {
+ client.offlineTable(badLogin, tableName, false);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void onlineTableLoginFailure() throws Exception {
+ client.onlineTable(badLogin, tableName, false);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void removeConstraintLoginFailure() throws Exception {
+ client.removeConstraint(badLogin, tableName, 0);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void removeTablePropertyLoginFailure() throws Exception {
+ client.removeTableProperty(badLogin, tableName, Property.TABLE_FILE_MAX.getKey());
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void renameTableLoginFailure() throws Exception {
+ client.renameTable(badLogin, tableName, "someTableName");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void setLocalityGroupsLoginFailure() throws Exception {
+ Map<String,Set<String>> groups = new HashMap<>();
+ groups.put("group1", Collections.singleton("cf1"));
+ groups.put("group2", Collections.singleton("cf2"));
+ client.setLocalityGroups(badLogin, tableName, groups);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void setTablePropertyLoginFailure() throws Exception {
+ client.setTableProperty(badLogin, tableName, Property.TABLE_FILE_MAX.getKey(), "0");
+ }
+
+ @Test(expected = TException.class, timeout = 5000)
+ public void tableIdMapLoginFailure() throws Exception {
+ client.tableIdMap(badLogin);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getSiteConfigurationLoginFailure() throws Exception {
+ client.getSiteConfiguration(badLogin);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getSystemConfigurationLoginFailure() throws Exception {
+ client.getSystemConfiguration(badLogin);
+ }
+
+ @Test(expected = TException.class, timeout = 5000)
+ public void getTabletServersLoginFailure() throws Exception {
+ client.getTabletServers(badLogin);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getActiveScansLoginFailure() throws Exception {
+ client.getActiveScans(badLogin, "fake");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getActiveCompactionsLoginFailure() throws Exception {
+ client.getActiveCompactions(badLogin, "fakse");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void removePropertyLoginFailure() throws Exception {
+ client.removeProperty(badLogin, "table.split.threshold");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void setPropertyLoginFailure() throws Exception {
+ client.setProperty(badLogin, "table.split.threshold", "500M");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void testClassLoadLoginFailure() throws Exception {
+ client.testClassLoad(badLogin, DevNull.class.getName(), SortedKeyValueIterator.class.getName());
+ }
+
+ @Test(timeout = 5000)
+ public void authenticateUserLoginFailure() throws Exception {
+ if (!isKerberosEnabled()) {
+ try {
+ // Not really a relevant test for kerberos
+ client.authenticateUser(badLogin, "root", s2pp(SharedMiniClusterBase.getRootPassword()));
+ fail("Expected AccumuloSecurityException");
+ } catch (AccumuloSecurityException e) {
+ // Expected
+ return;
+ }
+ }
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void changeUserAuthorizationsLoginFailure() throws Exception {
+ HashSet<ByteBuffer> auths = new HashSet<>(Arrays.asList(s2bb("A"), s2bb("B")));
+ client.changeUserAuthorizations(badLogin, "stooge", auths);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void changePasswordLoginFailure() throws Exception {
+ client.changeLocalUserPassword(badLogin, "stooge", s2bb(""));
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void createUserLoginFailure() throws Exception {
+ client.createLocalUser(badLogin, "stooge", s2bb("password"));
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void dropUserLoginFailure() throws Exception {
+ client.dropLocalUser(badLogin, "stooge");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getUserAuthorizationsLoginFailure() throws Exception {
+ client.getUserAuthorizations(badLogin, "stooge");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void grantSystemPermissionLoginFailure() throws Exception {
+ client.grantSystemPermission(badLogin, "stooge", SystemPermission.CREATE_TABLE);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void grantTablePermissionLoginFailure() throws Exception {
+ client.grantTablePermission(badLogin, "root", tableName, TablePermission.WRITE);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void hasSystemPermissionLoginFailure() throws Exception {
+ client.hasSystemPermission(badLogin, "stooge", SystemPermission.CREATE_TABLE);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void hasTablePermission() throws Exception {
+ client.hasTablePermission(badLogin, "root", tableName, TablePermission.WRITE);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void listLocalUsersLoginFailure() throws Exception {
+ client.listLocalUsers(badLogin);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void revokeSystemPermissionLoginFailure() throws Exception {
+ client.revokeSystemPermission(badLogin, "stooge", SystemPermission.CREATE_TABLE);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void revokeTablePermissionLoginFailure() throws Exception {
+ client.revokeTablePermission(badLogin, "root", tableName, TablePermission.ALTER_TABLE);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void createScannerLoginFailure() throws Exception {
+ client.createScanner(badLogin, tableName, new ScanOptions());
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void createBatchScannerLoginFailure() throws Exception {
+ client.createBatchScanner(badLogin, tableName, new BatchScanOptions());
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void updateAndFlushLoginFailure() throws Exception {
+ client.updateAndFlush(badLogin, tableName, new HashMap<ByteBuffer,List<ColumnUpdate>>());
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void createWriterLoginFailure() throws Exception {
+ client.createWriter(badLogin, tableName, new WriterOptions());
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void attachIteratorLoginFailure() throws Exception {
+ client.attachIterator(badLogin, "slow", setting, EnumSet.allOf(IteratorScope.class));
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void checkIteratorLoginFailure() throws Exception {
+ client.checkIteratorConflicts(badLogin, tableName, setting, EnumSet.allOf(IteratorScope.class));
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void cloneTableLoginFailure() throws Exception {
+ client.cloneTable(badLogin, tableName, tableName + "_clone", false, null, null);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void exportTableLoginFailure() throws Exception {
+ client.exportTable(badLogin, tableName, "/tmp");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void importTableLoginFailure() throws Exception {
+ client.importTable(badLogin, "testify", "/tmp");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getIteratorSettingLoginFailure() throws Exception {
+ client.getIteratorSetting(badLogin, tableName, "foo", IteratorScope.SCAN);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void listIteratorsLoginFailure() throws Exception {
+ client.listIterators(badLogin, tableName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void removeIteratorLoginFailure() throws Exception {
+ client.removeIterator(badLogin, tableName, "name", EnumSet.allOf(IteratorScope.class));
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void splitRangeByTabletsLoginFailure() throws Exception {
+ client.splitRangeByTablets(badLogin, tableName, client.getRowRange(ByteBuffer.wrap("row".getBytes(UTF_8))), 10);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void importDirectoryLoginFailure() throws Exception {
+ MiniAccumuloClusterImpl cluster = SharedMiniClusterBase.getCluster();
+ Path base = cluster.getTemporaryPath();
+ Path importDir = new Path(base, "importDir");
+ Path failuresDir = new Path(base, "failuresDir");
+ assertTrue(cluster.getFileSystem().mkdirs(importDir));
+ assertTrue(cluster.getFileSystem().mkdirs(failuresDir));
+ client.importDirectory(badLogin, tableName, importDir.toString(), failuresDir.toString(), true);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void pingTabletServerLoginFailure() throws Exception {
+ client.pingTabletServer(badLogin, "fake");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void loginFailure() throws Exception {
+ client.login("badUser", properties);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void testTableClassLoadLoginFailure() throws Exception {
+ client.testTableClassLoad(badLogin, tableName, VersioningIterator.class.getName(), SortedKeyValueIterator.class.getName());
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void createConditionalWriterLoginFailure() throws Exception {
+ client.createConditionalWriter(badLogin, tableName, new ConditionalWriterOptions());
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void grantNamespacePermissionLoginFailure() throws Exception {
+ client.grantNamespacePermission(badLogin, "stooge", namespaceName, NamespacePermission.ALTER_NAMESPACE);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void hasNamespacePermissionLoginFailure() throws Exception {
+ client.hasNamespacePermission(badLogin, "stooge", namespaceName, NamespacePermission.ALTER_NAMESPACE);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void revokeNamespacePermissionLoginFailure() throws Exception {
+ client.revokeNamespacePermission(badLogin, "stooge", namespaceName, NamespacePermission.ALTER_NAMESPACE);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void listNamespacesLoginFailure() throws Exception {
+ client.listNamespaces(badLogin);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void namespaceExistsLoginFailure() throws Exception {
+ client.namespaceExists(badLogin, namespaceName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void createNamespaceLoginFailure() throws Exception {
+ client.createNamespace(badLogin, "abcdef");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void deleteNamespaceLoginFailure() throws Exception {
+ client.deleteNamespace(badLogin, namespaceName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void renameNamespaceLoginFailure() throws Exception {
+ client.renameNamespace(badLogin, namespaceName, "abcdef");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void setNamespacePropertyLoginFailure() throws Exception {
+ client.setNamespaceProperty(badLogin, namespaceName, "table.compaction.major.ratio", "4");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void removeNamespacePropertyLoginFailure() throws Exception {
+ client.removeNamespaceProperty(badLogin, namespaceName, "table.compaction.major.ratio");
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getNamespacePropertiesLoginFailure() throws Exception {
+ client.getNamespaceProperties(badLogin, namespaceName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void namespaceIdMapLoginFailure() throws Exception {
+ client.namespaceIdMap(badLogin);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void attachNamespaceIteratorLoginFailure() throws Exception {
+ IteratorSetting setting = new IteratorSetting(100, "DebugTheThings", DebugIterator.class.getName(), Collections.<String,String> emptyMap());
+ client.attachNamespaceIterator(badLogin, namespaceName, setting, EnumSet.allOf(IteratorScope.class));
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void removeNamespaceIteratorLoginFailure() throws Exception {
+ client.removeNamespaceIterator(badLogin, namespaceName, "DebugTheThings", EnumSet.allOf(IteratorScope.class));
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void getNamespaceIteratorSettingLoginFailure() throws Exception {
+ client.getNamespaceIteratorSetting(badLogin, namespaceName, "DebugTheThings", IteratorScope.SCAN);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void listNamespaceIteratorsLoginFailure() throws Exception {
+ client.listNamespaceIterators(badLogin, namespaceName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void checkNamespaceIteratorConflictsLoginFailure() throws Exception {
+ IteratorSetting setting = new IteratorSetting(100, "DebugTheThings", DebugIterator.class.getName(), Collections.<String,String> emptyMap());
+ client.checkNamespaceIteratorConflicts(badLogin, namespaceName, setting, EnumSet.allOf(IteratorScope.class));
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void addNamespaceConstraintLoginFailure() throws Exception {
+ client.addNamespaceConstraint(badLogin, namespaceName, MaxMutationSize.class.getName());
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void removeNamespaceConstraintLoginFailure() throws Exception {
+ client.removeNamespaceConstraint(badLogin, namespaceName, 1);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void listNamespaceConstraintsLoginFailure() throws Exception {
+ client.listNamespaceConstraints(badLogin, namespaceName);
+ }
+
+ @Test(expected = AccumuloSecurityException.class, timeout = 5000)
+ public void testNamespaceClassLoadLoginFailure() throws Exception {
+ client.testNamespaceClassLoad(badLogin, namespaceName, DebugIterator.class.getName(), SortedKeyValueIterator.class.getName());
+ }
+
+ @Test
+ public void tableNotFound() throws Exception {
+ final String doesNotExist = "doesNotExists";
+ try {
+ client.addConstraint(creds, doesNotExist, NumericValueConstraint.class.getName());
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.addSplits(creds, doesNotExist, Collections.<ByteBuffer> emptySet());
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ final IteratorSetting setting = new IteratorSetting(100, "slow", SlowIterator.class.getName(), Collections.singletonMap("sleepTime", "200"));
+ try {
+ client.attachIterator(creds, doesNotExist, setting, EnumSet.allOf(IteratorScope.class));
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.cancelCompaction(creds, doesNotExist);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.checkIteratorConflicts(creds, doesNotExist, setting, EnumSet.allOf(IteratorScope.class));
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.clearLocatorCache(creds, doesNotExist);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ final String TABLE_TEST = getUniqueNames(1)[0];
+ client.cloneTable(creds, doesNotExist, TABLE_TEST, false, null, null);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.compactTable(creds, doesNotExist, null, null, null, true, false, null);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.createBatchScanner(creds, doesNotExist, new BatchScanOptions());
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.createScanner(creds, doesNotExist, new ScanOptions());
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.createWriter(creds, doesNotExist, new WriterOptions());
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.deleteRows(creds, doesNotExist, null, null);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.deleteTable(creds, doesNotExist);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.exportTable(creds, doesNotExist, "/tmp");
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.flushTable(creds, doesNotExist, null, null, false);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.getIteratorSetting(creds, doesNotExist, "foo", IteratorScope.SCAN);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.getLocalityGroups(creds, doesNotExist);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.getMaxRow(creds, doesNotExist, Collections.<ByteBuffer> emptySet(), null, false, null, false);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.getTableProperties(creds, doesNotExist);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.grantTablePermission(creds, "root", doesNotExist, TablePermission.WRITE);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.hasTablePermission(creds, "root", doesNotExist, TablePermission.WRITE);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ MiniAccumuloClusterImpl cluster = SharedMiniClusterBase.getCluster();
+ Path base = cluster.getTemporaryPath();
+ Path importDir = new Path(base, "importDir");
+ Path failuresDir = new Path(base, "failuresDir");
+ assertTrue(cluster.getFileSystem().mkdirs(importDir));
+ assertTrue(cluster.getFileSystem().mkdirs(failuresDir));
+ client.importDirectory(creds, doesNotExist, importDir.toString(), failuresDir.toString(), true);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.listConstraints(creds, doesNotExist);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.listSplits(creds, doesNotExist, 10000);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.mergeTablets(creds, doesNotExist, null, null);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.offlineTable(creds, doesNotExist, false);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.onlineTable(creds, doesNotExist, false);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.removeConstraint(creds, doesNotExist, 0);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.removeIterator(creds, doesNotExist, "name", EnumSet.allOf(IteratorScope.class));
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.removeTableProperty(creds, doesNotExist, Property.TABLE_FILE_MAX.getKey());
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.renameTable(creds, doesNotExist, "someTableName");
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.revokeTablePermission(creds, "root", doesNotExist, TablePermission.ALTER_TABLE);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.setTableProperty(creds, doesNotExist, Property.TABLE_FILE_MAX.getKey(), "0");
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.splitRangeByTablets(creds, doesNotExist, client.getRowRange(ByteBuffer.wrap("row".getBytes(UTF_8))), 10);
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.updateAndFlush(creds, doesNotExist, new HashMap<ByteBuffer,List<ColumnUpdate>>());
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.getDiskUsage(creds, Collections.singleton(doesNotExist));
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.testTableClassLoad(creds, doesNotExist, VersioningIterator.class.getName(), SortedKeyValueIterator.class.getName());
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ try {
+ client.createConditionalWriter(creds, doesNotExist, new ConditionalWriterOptions());
+ fail("exception not thrown");
+ } catch (TableNotFoundException ex) {}
+ }
+
+ @Test
+ public void namespaceNotFound() throws Exception {
+ final String doesNotExist = "doesNotExists";
+ try {
+ client.deleteNamespace(creds, doesNotExist);
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.renameNamespace(creds, doesNotExist, "abcdefg");
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.setNamespaceProperty(creds, doesNotExist, "table.compaction.major.ratio", "4");
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.removeNamespaceProperty(creds, doesNotExist, "table.compaction.major.ratio");
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.getNamespaceProperties(creds, doesNotExist);
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ IteratorSetting setting = new IteratorSetting(100, "DebugTheThings", DebugIterator.class.getName(), Collections.<String,String> emptyMap());
+ client.attachNamespaceIterator(creds, doesNotExist, setting, EnumSet.allOf(IteratorScope.class));
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.removeNamespaceIterator(creds, doesNotExist, "DebugTheThings", EnumSet.allOf(IteratorScope.class));
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.getNamespaceIteratorSetting(creds, doesNotExist, "DebugTheThings", IteratorScope.SCAN);
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.listNamespaceIterators(creds, doesNotExist);
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ IteratorSetting setting = new IteratorSetting(100, "DebugTheThings", DebugIterator.class.getName(), Collections.<String,String> emptyMap());
+ client.checkNamespaceIteratorConflicts(creds, doesNotExist, setting, EnumSet.allOf(IteratorScope.class));
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.addNamespaceConstraint(creds, doesNotExist, MaxMutationSize.class.getName());
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.removeNamespaceConstraint(creds, doesNotExist, 1);
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.listNamespaceConstraints(creds, doesNotExist);
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ try {
+ client.testNamespaceClassLoad(creds, doesNotExist, DebugIterator.class.getName(), SortedKeyValueIterator.class.getName());
+ fail("exception not thrown");
+ } catch (NamespaceNotFoundException ex) {}
+ }
+
+ @Test
+ public void testExists() throws Exception {
+ client.createTable(creds, "ett1", false, TimeType.MILLIS);
+ client.createTable(creds, "ett2", false, TimeType.MILLIS);
+ try {
+ client.createTable(creds, "ett1", false, TimeType.MILLIS);
+ fail("exception not thrown");
+ } catch (TableExistsException tee) {}
+ try {
+ client.renameTable(creds, "ett1", "ett2");
+ fail("exception not thrown");
+ } catch (TableExistsException tee) {}
+ try {
+ client.cloneTable(creds, "ett1", "ett2", false, new HashMap<String,String>(), new HashSet<String>());
+ fail("exception not thrown");
+ } catch (TableExistsException tee) {}
+ }
+
+ @Test
+ public void testNamespaceExists() throws Exception {
+ client.createNamespace(creds, "foobar");
+ try {
+ client.createNamespace(creds, namespaceName);
+ fail("exception not thrown");
+ } catch (NamespaceExistsException ex) {}
+ try {
+ client.renameNamespace(creds, "foobar", namespaceName);
+ fail("exception not thrown");
+ } catch (NamespaceExistsException ex) {}
+ }
+
+ @Test(expected = NamespaceNotEmptyException.class)
+ public void testNamespaceNotEmpty() throws Exception {
+ client.createTable(creds, namespaceName + ".abcdefg", true, TimeType.MILLIS);
+ client.deleteNamespace(creds, namespaceName);
+ }
+
+ @Test
+ public void testUnknownScanner() throws Exception {
+ String scanner = client.createScanner(creds, tableName, null);
+ assertFalse(client.hasNext(scanner));
+ client.closeScanner(scanner);
+
+ try {
+ client.hasNext(scanner);
+ fail("exception not thrown");
+ } catch (UnknownScanner us) {}
+
+ try {
+ client.closeScanner(scanner);
+ fail("exception not thrown");
+ } catch (UnknownScanner us) {}
+
+ try {
+ client.nextEntry("99999999");
+ fail("exception not thrown");
+ } catch (UnknownScanner us) {}
+ try {
+ client.nextK("99999999", 6);
+ fail("exception not thrown");
+ } catch (UnknownScanner us) {}
+ try {
+ client.hasNext("99999999");
+ fail("exception not thrown");
+ } catch (UnknownScanner us) {}
+ try {
+ client.hasNext(UUID.randomUUID().toString());
+ fail("exception not thrown");
+ } catch (UnknownScanner us) {}
+ }
+
+ @Test
+ public void testUnknownWriter() throws Exception {
+ String writer = client.createWriter(creds, tableName, null);
+ client.update(writer, mutation("row0", "cf", "cq", "value"));
+ client.flush(writer);
+ client.update(writer, mutation("row2", "cf", "cq", "value2"));
+ client.closeWriter(writer);
+
+ // this is a oneway call, so it does not throw exceptions
+ client.update(writer, mutation("row2", "cf", "cq", "value2"));
+
+ try {
+ client.flush(writer);
+ fail("exception not thrown");
+ } catch (UnknownWriter uw) {}
+ try {
+ client.flush("99999");
+ fail("exception not thrown");
+ } catch (UnknownWriter uw) {}
+ try {
+ client.flush(UUID.randomUUID().toString());
+ fail("exception not thrown");
+ } catch (UnknownWriter uw) {}
+ try {
+ client.closeWriter("99999");
+ fail("exception not thrown");
+ } catch (UnknownWriter uw) {}
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ client.updateAndFlush(creds, tableName, mutation("row0", "cf", "cq", "value"));
+
+ assertScan(new String[][] {{"row0", "cf", "cq", "value"}}, tableName);
+
+ ColumnUpdate upd = new ColumnUpdate(s2bb("cf"), s2bb("cq"));
+ upd.setDeleteCell(false);
+ Map<ByteBuffer,List<ColumnUpdate>> notDelete = Collections.singletonMap(s2bb("row0"), Collections.singletonList(upd));
+ client.updateAndFlush(creds, tableName, notDelete);
+ String scanner = client.createScanner(creds, tableName, null);
+ ScanResult entries = client.nextK(scanner, 10);
+ client.closeScanner(scanner);
+ assertFalse(entries.more);
+ assertEquals("Results: " + entries.results, 1, entries.results.size());
+
+ upd = new ColumnUpdate(s2bb("cf"), s2bb("cq"));
+ upd.setDeleteCell(true);
+ Map<ByteBuffer,List<ColumnUpdate>> delete = Collections.singletonMap(s2bb("row0"), Collections.singletonList(upd));
+
+ client.updateAndFlush(creds, tableName, delete);
+
+ assertScan(new String[][] {}, tableName);
+ }
+
+ @Test
+ public void testSystemProperties() throws Exception {
+ Map<String,String> cfg = client.getSiteConfiguration(creds);
+
+ // set a property in zookeeper
+ client.setProperty(creds, "table.split.threshold", "500M");
+
+ // check that we can read it
+ for (int i = 0; i < 5; i++) {
+ cfg = client.getSystemConfiguration(creds);
+ if ("500M".equals(cfg.get("table.split.threshold")))
+ break;
+ sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ assertEquals("500M", cfg.get("table.split.threshold"));
+
+ // unset the setting, check that it's not what it was
+ client.removeProperty(creds, "table.split.threshold");
+ for (int i = 0; i < 5; i++) {
+ cfg = client.getSystemConfiguration(creds);
+ if (!"500M".equals(cfg.get("table.split.threshold")))
+ break;
+ sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ assertNotEquals("500M", cfg.get("table.split.threshold"));
+ }
+
+ @Test
+ public void pingTabletServers() throws Exception {
+ int tservers = 0;
+ for (String tserver : client.getTabletServers(creds)) {
+ client.pingTabletServer(creds, tserver);
+ tservers++;
+ }
+ assertTrue(tservers > 0);
+ }
+
+ @Test
+ public void testSiteConfiguration() throws Exception {
+ // get something we know is in the site config
+ MiniAccumuloClusterImpl cluster = SharedMiniClusterBase.getCluster();
+ Map<String,String> cfg = client.getSiteConfiguration(creds);
+ assertTrue(cfg.get("instance.dfs.dir").startsWith(cluster.getConfig().getAccumuloDir().getAbsolutePath()));
+ }
+
+ @Test
+ public void testClassLoad() throws Exception {
+ // try to load some classes via the proxy
+ assertTrue(client.testClassLoad(creds, DevNull.class.getName(), SortedKeyValueIterator.class.getName()));
+ assertFalse(client.testClassLoad(creds, "foo.bar", SortedKeyValueIterator.class.getName()));
+ }
+
+ @Test
+ public void attachIteratorsWithScans() throws Exception {
+ if (client.tableExists(creds, "slow")) {
+ client.deleteTable(creds, "slow");
+ }
+
+ // create a table that's very slow, so we can look for scans
+ client.createTable(creds, "slow", true, TimeType.MILLIS);
+ IteratorSetting setting = new IteratorSetting(100, "slow", SlowIterator.class.getName(), Collections.singletonMap("sleepTime", "250"));
+ client.attachIterator(creds, "slow", setting, EnumSet.allOf(IteratorScope.class));
+
+ // Should take 10 seconds to read every record
+ for (int i = 0; i < 40; i++) {
+ client.updateAndFlush(creds, "slow", mutation("row" + i, "cf", "cq", "value"));
+ }
+
+ // scan
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ String scanner;
+ TestProxyClient proxyClient2 = null;
+ try {
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ proxyClient2 = new TestProxyClient(hostname, proxyPort, factory, proxyPrimary, UserGroupInformation.getCurrentUser());
+ } else {
+ proxyClient2 = new TestProxyClient(hostname, proxyPort, factory);
+ }
+
+ Client client2 = proxyClient2.proxy();
+ scanner = client2.createScanner(creds, "slow", null);
+ client2.nextK(scanner, 10);
+ client2.closeScanner(scanner);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (null != proxyClient2) {
+ proxyClient2.close();
+ }
+ }
+ }
+ };
+ t.start();
+
+ // look for the scan many times
+ List<ActiveScan> scans = new ArrayList<>();
+ for (int i = 0; i < 100 && scans.isEmpty(); i++) {
+ for (String tserver : client.getTabletServers(creds)) {
+ List<ActiveScan> scansForServer = client.getActiveScans(creds, tserver);
+ for (ActiveScan scan : scansForServer) {
+ if (clientPrincipal.equals(scan.getUser())) {
+ scans.add(scan);
+ }
+ }
+
+ if (!scans.isEmpty())
+ break;
+ sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+ t.join();
+
+ assertFalse("Expected to find scans, but found none", scans.isEmpty());
+ boolean found = false;
+ Map<String,String> map = null;
+ for (int i = 0; i < scans.size() && !found; i++) {
+ ActiveScan scan = scans.get(i);
+ if (clientPrincipal.equals(scan.getUser())) {
+ assertTrue(ScanState.RUNNING.equals(scan.getState()) || ScanState.QUEUED.equals(scan.getState()));
+ assertEquals(ScanType.SINGLE, scan.getType());
+ assertEquals("slow", scan.getTable());
+
+ map = client.tableIdMap(creds);
+ assertEquals(map.get("slow"), scan.getExtent().tableId);
+ assertTrue(scan.getExtent().endRow == null);
+ assertTrue(scan.getExtent().prevEndRow == null);
+ found = true;
+ }
+ }
+
+ assertTrue("Could not find a scan against the 'slow' table", found);
+ }
+
+ @Test
+ public void attachIteratorWithCompactions() throws Exception {
+ if (client.tableExists(creds, "slow")) {
+ client.deleteTable(creds, "slow");
+ }
+
+ // create a table that's very slow, so we can look for compactions
+ client.createTable(creds, "slow", true, TimeType.MILLIS);
+ IteratorSetting setting = new IteratorSetting(100, "slow", SlowIterator.class.getName(), Collections.singletonMap("sleepTime", "250"));
+ client.attachIterator(creds, "slow", setting, EnumSet.allOf(IteratorScope.class));
+
+ // Should take 10 seconds to read every record
+ for (int i = 0; i < 40; i++) {
+ client.updateAndFlush(creds, "slow", mutation("row" + i, "cf", "cq", "value"));
+ }
+
+ Map<String,String> map = client.tableIdMap(creds);
+
+ // start a compaction
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ TestProxyClient proxyClient2 = null;
+ try {
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ proxyClient2 = new TestProxyClient(hostname, proxyPort, factory, proxyPrimary, UserGroupInformation.getCurrentUser());
+ } else {
+ proxyClient2 = new TestProxyClient(hostname, proxyPort, factory);
+ }
+ Client client2 = proxyClient2.proxy();
+ client2.compactTable(creds, "slow", null, null, null, true, true, null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (null != proxyClient2) {
+ proxyClient2.close();
+ }
+ }
+ }
+ };
+ t.start();
+
+ final String desiredTableId = map.get("slow");
+
+ // Make sure we can find the slow table
+ assertNotNull(desiredTableId);
+
+ // try to catch it in the act
+ List<ActiveCompaction> compactions = new ArrayList<>();
+ for (int i = 0; i < 100 && compactions.isEmpty(); i++) {
+ // Iterate over the tservers
+ for (String tserver : client.getTabletServers(creds)) {
+ // And get the compactions on each
+ List<ActiveCompaction> compactionsOnServer = client.getActiveCompactions(creds, tserver);
+ for (ActiveCompaction compact : compactionsOnServer) {
+ // There might be other compactions occurring (e.g. on METADATA) in which
+ // case we want to prune out those that aren't for our slow table
+ if (desiredTableId.equals(compact.getExtent().tableId)) {
+ compactions.add(compact);
+ }
+ }
+
+ // If we found a compaction for the table we wanted, so we can stop looking
+ if (!compactions.isEmpty())
+ break;
+ }
+ sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+ }
+ t.join();
+
+ // verify the compaction information
+ assertFalse(compactions.isEmpty());
+ for (ActiveCompaction c : compactions) {
+ if (desiredTableId.equals(c.getExtent().tableId)) {
+ assertTrue(c.inputFiles.isEmpty());
+ assertEquals(CompactionType.MINOR, c.getType());
+ assertEquals(CompactionReason.USER, c.getReason());
+ assertEquals("", c.localityGroup);
+ assertTrue(c.outputFile.contains("default_tablet"));
+
+ return;
+ }
+ }
+
+ fail("Expection to find running compaction for table 'slow' but did not find one");
+ }
+
+ @Test
+ public void userAuthentication() throws Exception {
+ if (isKerberosEnabled()) {
+ assertTrue(client.authenticateUser(creds, clientPrincipal, Collections.<String,String> emptyMap()));
+ // Can't really authenticate "badly" at the application level w/ kerberos. It's going to fail to even set up
+ // an RPC
+ } else {
+ // check password
+ assertTrue(client.authenticateUser(creds, "root", s2pp(SharedMiniClusterBase.getRootPassword())));
+ assertFalse(client.authenticateUser(creds, "root", s2pp("")));
+ }
+ }
+
+ @Test
+ public void userManagement() throws Exception {
+
+ String user;
+ ClusterUser otherClient = null;
+ ByteBuffer password = s2bb("password");
+ if (isKerberosEnabled()) {
+ otherClient = getKdc().getClientPrincipal(1);
+ user = otherClient.getPrincipal();
+ } else {
+ user = getUniqueNames(1)[0];
+ }
+
+ // create a user
+ client.createLocalUser(creds, user, password);
+ // change auths
+ Set<String> users = client.listLocalUsers(creds);
+ Set<String> expectedUsers = new HashSet<>(Arrays.asList(clientPrincipal, user));
+ assertTrue("Did not find all expected users: " + expectedUsers, users.containsAll(expectedUsers));
+ HashSet<ByteBuffer> auths = new HashSet<>(Arrays.asList(s2bb("A"), s2bb("B")));
+ client.changeUserAuthorizations(creds, user, auths);
+ List<ByteBuffer> update = client.getUserAuthorizations(creds, user);
+ assertEquals(auths, new HashSet<>(update));
+
+ // change password
+ if (!isKerberosEnabled()) {
+ password = s2bb("");
+ client.changeLocalUserPassword(creds, user, password);
+ assertTrue(client.authenticateUser(creds, user, s2pp(ByteBufferUtil.toString(password))));
+ }
+
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(otherClient.getPrincipal(), otherClient.getKeytab().getAbsolutePath());
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ // Re-login in and make a new connection. Can't use the previous one
+
+ TestProxyClient otherProxyClient = null;
+ try {
+ otherProxyClient = new TestProxyClient(hostname, proxyPort, factory, proxyPrimary, ugi);
+ otherProxyClient.proxy().login(user, Collections.<String,String> emptyMap());
+ } finally {
+ if (null != otherProxyClient) {
+ otherProxyClient.close();
+ }
+ }
+ } else {
+ // check login with new password
+ client.login(user, s2pp(ByteBufferUtil.toString(password)));
+ }
+ }
+
+ @Test
+ public void userPermissions() throws Exception {
+ String userName = getUniqueNames(1)[0];
+ ClusterUser otherClient = null;
+ ByteBuffer password = s2bb("password");
+ ByteBuffer user;
+
+ TestProxyClient origProxyClient = null;
+ Client origClient = null;
+ TestProxyClient userProxyClient = null;
+ Client userClient = null;
+
+ if (isKerberosEnabled()) {
+ otherClient = getKdc().getClientPrincipal(1);
+ userName = otherClient.getPrincipal();
+
+ UserGroupInformation.loginUserFromKeytab(otherClient.getPrincipal(), otherClient.getKeytab().getAbsolutePath());
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ // Re-login in and make a new connection. Can't use the previous one
+
+ userProxyClient = new TestProxyClient(hostname, proxyPort, factory, proxyPrimary, ugi);
+
+ origProxyClient = proxyClient;
+ origClient = client;
+ userClient = client = userProxyClient.proxy();
+
+ user = client.login(userName, Collections.<String,String> emptyMap());
+ } else {
+ userName = getUniqueNames(1)[0];
+ // create a user
+ client.createLocalUser(creds, userName, password);
+ user = client.login(userName, s2pp(ByteBufferUtil.toString(password)));
+ }
+
+ // check permission failure
+ try {
+ client.createTable(user, "fail", true, TimeType.MILLIS);
+ fail("should not create the table");
+ } catch (AccumuloSecurityException ex) {
+ if (isKerberosEnabled()) {
+ // Switch back to original client
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+ assertFalse(client.listTables(creds).contains("fail"));
+ }
+ // grant permissions and test
+ assertFalse(client.hasSystemPermission(creds, userName, SystemPermission.CREATE_TABLE));
+ client.grantSystemPermission(creds, userName, SystemPermission.CREATE_TABLE);
+ assertTrue(client.hasSystemPermission(creds, userName, SystemPermission.CREATE_TABLE));
+ if (isKerberosEnabled()) {
+ // Switch back to the extra user
+ UserGroupInformation.loginUserFromKeytab(otherClient.getPrincipal(), otherClient.getKeytab().getAbsolutePath());
+ client = userClient;
+ }
+ client.createTable(user, "success", true, TimeType.MILLIS);
+ if (isKerberosEnabled()) {
+ // Switch back to original client
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+ assertTrue(client.listTables(creds).contains("success"));
+
+ // revoke permissions
+ client.revokeSystemPermission(creds, userName, SystemPermission.CREATE_TABLE);
+ assertFalse(client.hasSystemPermission(creds, userName, SystemPermission.CREATE_TABLE));
+ try {
+ if (isKerberosEnabled()) {
+ // Switch back to the extra user
+ UserGroupInformation.loginUserFromKeytab(otherClient.getPrincipal(), otherClient.getKeytab().getAbsolutePath());
+ client = userClient;
+ }
+ client.createTable(user, "fail", true, TimeType.MILLIS);
+ fail("should not create the table");
+ } catch (AccumuloSecurityException ex) {
+ if (isKerberosEnabled()) {
+ // Switch back to original client
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+ assertFalse(client.listTables(creds).contains("fail"));
+ }
+ // denied!
+ try {
+ if (isKerberosEnabled()) {
+ // Switch back to the extra user
+ UserGroupInformation.loginUserFromKeytab(otherClient.getPrincipal(), otherClient.getKeytab().getAbsolutePath());
+ client = userClient;
+ }
+ String scanner = client.createScanner(user, tableName, null);
+ client.nextK(scanner, 100);
+ fail("stooge should not read table test");
+ } catch (AccumuloSecurityException ex) {}
+
+ if (isKerberosEnabled()) {
+ // Switch back to original client
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+
+ // grant
+ assertFalse(client.hasTablePermission(creds, userName, tableName, TablePermission.READ));
+ client.grantTablePermission(creds, userName, tableName, TablePermission.READ);
+ assertTrue(client.hasTablePermission(creds, userName, tableName, TablePermission.READ));
+
+ if (isKerberosEnabled()) {
+ // Switch back to the extra user
+ UserGroupInformation.loginUserFromKeytab(otherClient.getPrincipal(), otherClient.getKeytab().getAbsolutePath());
+ client = userClient;
+ }
+ String scanner = client.createScanner(user, tableName, null);
+ client.nextK(scanner, 10);
+ client.closeScanner(scanner);
+
+ if (isKerberosEnabled()) {
+ // Switch back to original client
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+
+ // revoke
+ client.revokeTablePermission(creds, userName, tableName, TablePermission.READ);
+ assertFalse(client.hasTablePermission(creds, userName, tableName, TablePermission.READ));
+ try {
+ if (isKerberosEnabled()) {
+ // Switch back to the extra user
+ UserGroupInformation.loginUserFromKeytab(otherClient.getPrincipal(), otherClient.getKeytab().getAbsolutePath());
+ client = userClient;
+ }
+ scanner = client.createScanner(user, tableName, null);
+ client.nextK(scanner, 100);
+ fail("stooge should not read table test");
+ } catch (AccumuloSecurityException ex) {}
+
+ if (isKerberosEnabled()) {
+ // Switch back to original client
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+
+ // delete user
+ client.dropLocalUser(creds, userName);
+ Set<String> users = client.listLocalUsers(creds);
+ assertFalse("Should not see user after they are deleted", users.contains(userName));
+
+ if (isKerberosEnabled()) {
+ userProxyClient.close();
+ proxyClient = origProxyClient;
+ client = origClient;
+ }
+ }
+
+ @Test
+ public void namespacePermissions() throws Exception {
+ String userName;
+ ClusterUser otherClient = null;
+ ByteBuffer password = s2bb("password");
+ ByteBuffer user;
+
+ TestProxyClient origProxyClient = null;
+ Client origClient = null;
+ TestProxyClient userProxyClient = null;
+ Client userClient = null;
+
+ if (isKerberosEnabled()) {
+ otherClient = getKdc().getClientPrincipal(1);
+ userName = otherClient.getPrincipal();
+
+ UserGroupInformation.loginUserFromKeytab(otherClient.getPrincipal(), otherClient.getKeytab().getAbsolutePath());
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ // Re-login in and make a new connection. Can't use the previous one
+
+ userProxyClient = new TestProxyClient(hostname, proxyPort, factory, proxyPrimary, ugi);
+
+ origProxyClient = proxyClient;
+ origClient = client;
+ userClient = client = userProxyClient.proxy();
+
+ user = client.login(userName, Collections.<String,String> emptyMap());
+ } else {
+ userName = getUniqueNames(1)[0];
+ // create a user
+ client.createLocalUser(creds, userName, password);
+ user = client.login(userName, s2pp(ByteBufferUtil.toString(password)));
+ }
+
+ // check permission failure
+ try {
+ client.createTable(user, namespaceName + ".fail", true, TimeType.MILLIS);
+ fail("should not create the table");
+ } catch (AccumuloSecurityException ex) {
+ if (isKerberosEnabled()) {
+ // Switch back to original client
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+ assertFalse(client.listTables(creds).contains(namespaceName + ".fail"));
+ }
+
+ // grant permissions and test
+ assertFalse(client.hasNamespacePermission(creds, userName, namespaceName, NamespacePermission.CREATE_TABLE));
+ client.grantNamespacePermission(creds, userName, namespaceName, NamespacePermission.CREATE_TABLE);
+ assertTrue(client.hasNamespacePermission(creds, userName, namespaceName, NamespacePermission.CREATE_TABLE));
+ if (isKerberosEnabled()) {
+ // Switch back to the extra user
+ UserGroupInformation.loginUserFromKeytab(otherClient.getPrincipal(), otherClient.getKeytab().getAbsolutePath());
+ client = userClient;
+ }
+ client.createTable(user, namespaceName + ".success", true, TimeType.MILLIS);
+ if (isKerberosEnabled()) {
+ // Switch back to original client
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+ assertTrue(client.listTables(creds).contains(namespaceName + ".success"));
+
+ // revoke permissions
+ client.revokeNamespacePermission(creds, userName, namespaceName, NamespacePermission.CREATE_TABLE);
+ assertFalse(client.hasNamespacePermission(creds, userName, namespaceName, NamespacePermission.CREATE_TABLE));
+ try {
+ if (isKerberosEnabled()) {
+ // Switch back to the extra user
+ UserGroupInformation.loginUserFromKeytab(otherClient.getPrincipal(), otherClient.getKeytab().getAbsolutePath());
+ client = userClient;
+ }
+ client.createTable(user, namespaceName + ".fail", true, TimeType.MILLIS);
+ fail("should not create the table");
+ } catch (AccumuloSecurityException ex) {
+ if (isKerberosEnabled()) {
+ // Switch back to original client
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+ assertFalse(client.listTables(creds).contains(namespaceName + ".fail"));
+ }
+
+ // delete user
+ client.dropLocalUser(creds, userName);
+ Set<String> users = client.listLocalUsers(creds);
+ assertFalse("Should not see user after they are deleted", users.contains(userName));
+
+ if (isKerberosEnabled()) {
+ userProxyClient.close();
+ proxyClient = origProxyClient;
+ client = origClient;
+ }
+
+ // delete table from namespace otherwise we can't delete namespace during teardown
+ client.deleteTable(creds, namespaceName + ".success");
+ }
+
+ @Test
+ public void testBatchWriter() throws Exception {
+ client.addConstraint(creds, tableName, NumericValueConstraint.class.getName());
+ // zookeeper propagation time
+ sleepUninterruptibly(ZOOKEEPER_PROPAGATION_TIME, TimeUnit.MILLISECONDS);
+
+ // Take the table offline and online to force a config update
+ client.offlineTable(creds, tableName, true);
+ client.onlineTable(creds, tableName, true);
+
+ WriterOptions writerOptions = new WriterOptions();
+ writerOptions.setLatencyMs(10000);
+ writerOptions.setMaxMemory(2);
+ writerOptions.setThreads(1);
+ writerOptions.setTimeoutMs(100000);
+
+ Map<String,Integer> constraints = client.listConstraints(creds, tableName);
+ while (!constraints.containsKey(NumericValueConstraint.class.getName())) {
+ log.info("Constraints don't contain NumericValueConstraint");
+ Thread.sleep(2000);
+ constraints = client.listConstraints(creds, tableName);
+ }
+
+ boolean success = false;
+ for (int i = 0; i < 15; i++) {
+ String batchWriter = client.createWriter(creds, tableName, writerOptions);
+ client.update(batchWriter, mutation("row1", "cf", "cq", "x"));
+ client.update(batchWriter, mutation("row1", "cf", "cq", "x"));
+ try {
+ client.flush(batchWriter);
+ log.debug("Constraint failed to fire. Waiting and retrying");
+ Thread.sleep(5000);
+ continue;
+ } catch (MutationsRejectedException ex) {}
+ try {
+ client.closeWriter(batchWriter);
+ log.debug("Constraint failed to fire. Waiting and retrying");
+ Thread.sleep(5000);
+ continue;
+ } catch (MutationsRejectedException e) {}
+ success = true;
+ break;
+ }
+
+ if (!success) {
+ fail("constraint did not fire");
+ }
+
+ client.removeConstraint(creds, tableName, 2);
+
+ // Take the table offline and online to force a config update
+ client.offlineTable(creds, tableName, true);
+ client.onlineTable(creds, tableName, true);
+
+ constraints = client.listConstraints(creds, tableName);
+ while (constraints.containsKey(NumericValueConstraint.class.getName())) {
+ log.info("Constraints still contains NumericValueConstraint");
+ Thread.sleep(2000);
+ constraints = client.listConstraints(creds, tableName);
+ }
+
+ assertScan(new String[][] {}, tableName);
+
+ sleepUninterruptibly(ZOOKEEPER_PROPAGATION_TIME, TimeUnit.MILLISECONDS);
+
+ writerOptions = new WriterOptions();
+ writerOptions.setLatencyMs(10000);
+ writerOptions.setMaxMemory(3000);
+ writerOptions.setThreads(1);
+ writerOptions.setTimeoutMs(100000);
+
+ success = false;
+ for (int i = 0; i < 15; i++) {
+ try {
+ String batchWriter = client.createWriter(creds, tableName, writerOptions);
+
+ client.update(batchWriter, mutation("row1", "cf", "cq", "x"));
+ client.flush(batchWriter);
+ client.closeWriter(batchWriter);
+ success = true;
+ break;
+ } catch (MutationsRejectedException e) {
+ log.info("Mutations were rejected, assuming constraint is still active", e);
+ Thread.sleep(5000);
+ }
+ }
+
+ if (!success) {
+ fail("Failed to successfully write data after constraint was removed");
+ }
+
+ assertScan(new String[][] {{"row1", "cf", "cq", "x"}}, tableName);
+
+ client.deleteTable(creds, tableName);
+ }
+
+ @Test
+ public void testTableConstraints() throws Exception {
+ log.debug("Setting NumericValueConstraint on " + tableName);
+
+ // constraints
+ client.addConstraint(creds, tableName, NumericValueConstraint.class.getName());
+
+ // zookeeper propagation time
+ Thread.sleep(ZOOKEEPER_PROPAGATION_TIME);
+
+ // Take the table offline and online to force a config update
+ client.offlineTable(creds, tableName, true);
+ client.onlineTable(creds, tableName, true);
+
+ log.debug("Attempting to verify client-side that constraints are observed");
+
+ Map<String,Integer> constraints = client.listConstraints(creds, tableName);
+ while (!constraints.containsKey(NumericValueConstraint.class.getName())) {
+ log.debug("Constraints don't contain NumericValueConstraint");
+ Thread.sleep(2000);
+ constraints = client.listConstraints(creds, tableName);
+ }
+
+ assertEquals(2, client.listConstraints(creds, tableName).size());
+ log.debug("Verified client-side that constraints exist");
+
+ // Write data that satisfies the constraint
+ client.updateAndFlush(creds, tableName, mutation("row1", "cf", "cq", "123"));
+
+ log.debug("Successfully wrote data that satisfies the constraint");
+ log.debug("Trying to write data that the constraint should reject");
+
+ // Expect failure on data that fails the constraint
+ while (true) {
+ try {
+ client.updateAndFlush(creds, tableName, mutation("row1", "cf", "cq", "x"));
+ log.debug("Expected mutation to be rejected, but was not. Waiting and retrying");
+ Thread.sleep(5000);
+ } catch (MutationsRejectedException ex) {
+ break;
+ }
+ }
+
+ log.debug("Saw expected failure on data which fails the constraint");
+
+ log.debug("Removing constraint from table");
+ client.removeConstraint(creds, tableName, 2);
+
+ sleepUninterruptibly(ZOOKEEPER_PROPAGATION_TIME, TimeUnit.MILLISECONDS);
+
+ // Take the table offline and online to force a config update
+ client.offlineTable(creds, tableName, true);
+ client.onlineTable(creds, tableName, true);
+
+ constraints = client.listConstraints(creds, tableName);
+ while (constraints.containsKey(NumericValueConstraint.class.getName())) {
+ log.debug("Constraints contains NumericValueConstraint");
+ Thread.sleep(2000);
+ constraints = client.listConstraints(creds, tableName);
+ }
+
+ assertEquals(1, client.listConstraints(creds, tableName).size());
+ log.debug("Verified client-side that the constraint was removed");
+
+ log.debug("Attempting to write mutation that should succeed after constraints was removed");
+ // Make sure we can write the data after we removed the constraint
+ while (true) {
+ try {
+ client.updateAndFlush(creds, tableName, mutation("row1", "cf", "cq", "x"));
+ break;
+ } catch (MutationsRejectedException ex) {
+ log.debug("Expected mutation accepted, but was not. Waiting and retrying");
+ Thread.sleep(5000);
+ }
+ }
+
+ log.debug("Verifying that record can be read from the table");
+ assertScan(new String[][] {{"row1", "cf", "cq", "x"}}, tableName);
+ }
+
+ @Test
+ public void tableMergesAndSplits() throws Exception {
+ // add some splits
+ client.addSplits(creds, tableName, new HashSet<>(Arrays.asList(s2bb("a"), s2bb("m"), s2bb("z"))));
+ List<ByteBuffer> splits = client.listSplits(creds, tableName, 1);
+ assertEquals(Arrays.asList(s2bb("m")), splits);
+
+ // Merge some of the splits away
+ client.mergeTablets(creds, tableName, null, s2bb("m"));
+ splits = client.listSplits(creds, tableName, 10);
+ assertEquals(Arrays.asList(s2bb("m"), s2bb("z")), splits);
+
+ // Merge the entire table
+ client.mergeTablets(creds, tableName, null, null);
+ splits = client.listSplits(creds, tableName, 10);
+ List<ByteBuffer> empty = Collections.emptyList();
+
+ // No splits after merge on whole table
+ assertEquals(empty, splits);
+ }
+
+ @Test
+ public void iteratorFunctionality() throws Exception {
+ // iterators
+ HashMap<String,String> options = new HashMap<>();
+ options.put("type", "STRING");
+ options.put("columns", "cf");
+ IteratorSetting setting = new IteratorSetting(10, tableName, SummingCombiner.class.getName(), options);
+ client.attachIterator(creds, tableName, setting, EnumSet.allOf(IteratorScope.class));
+ for (int i = 0; i < 10; i++) {
+ client.updateAndFlush(creds, tableName, mutation("row1", "cf", "cq", "1"));
+ }
+ // 10 updates of "1" in the value w/ SummingCombiner should return value of "10"
+ assertScan(new String[][] {{"row1", "cf", "cq", "10"}}, tableName);
+
+ try {
+ client.checkIteratorConflicts(creds, tableName, setting, EnumSet.allOf(IteratorScope.class));
+ fail("checkIteratorConflicts did not throw an exception");
+ } catch (Exception ex) {
+ // Expected
+ }
+ client.deleteRows(creds, tableName, null, null);
+ client.removeIterator(creds, tableName, "test", EnumSet.allOf(IteratorScope.class));
+ String expected[][] = new String[10][];
+ for (int i = 0; i < 10; i++) {
+ client.updateAndFlush(creds, tableName, mutation("row" + i, "cf", "cq", "" + i));
+ expected[i] = new String[] {"row" + i, "cf", "cq", "" + i};
+ client.flushTable(creds, tableName, null, null, true);
+ }
+ assertScan(expected, tableName);
+ }
+
+ @Test
+ public void cloneTable() throws Exception {
+ String TABLE_TEST2 = getUniqueNames(2)[1];
+
+ String expected[][] = new String[10][];
+ for (int i = 0; i < 10; i++) {
+ client.updateAndFlush(creds, tableName, mutation("row" + i, "cf", "cq", "" + i));
+ expected[i] = new String[] {"row" + i, "cf", "cq", "" + i};
+ client.flushTable(creds, tableName, null, null, true);
+ }
+ assertScan(expected, tableName);
+
+ // clone
+ client.cloneTable(creds, tableName, TABLE_TEST2, true, null, null);
+ assertScan(expected, TABLE_TEST2);
+ client.deleteTable(creds, TABLE_TEST2);
+ }
+
+ @Test
+ public void clearLocatorCache() throws Exception {
+ // don't know how to test this, call it just for fun
+ client.clearLocatorCache(creds, tableName);
+ }
+
+ @Test
+ public void compactTable() throws Exception {
+ String expected[][] = new String[10][];
+ for (int i = 0; i < 10; i++) {
+ client.updateAndFlush(creds, tableName, mutation("row" + i, "cf", "cq", "" + i));
+ expected[i] = new String[] {"row" + i, "cf", "cq", "" + i};
+ client.flushTable(creds, tableName, null, null, true);
+ }
+ assertScan(expected, tableName);
+
+ // compact
+ client.compactTable(creds, tableName, null, null, null, true, true, null);
+ assertEquals(1, countFiles(tableName));
+ assertScan(expected, tableName);
+ }
+
+ @Test
+ public void diskUsage() throws Exception {
+ String TABLE_TEST2 = getUniqueNames(2)[1];
+
+ // Write some data
+ String expected[][] = new String[10][];
+ for (int i = 0; i < 10; i++) {
+ client.updateAndFlush(creds, tableName, mutation("row" + i, "cf", "cq", "" + i));
+ expected[i] = new String[] {"row" + i, "cf", "cq", "" + i};
+ client.flushTable(creds, tableName, null, null, true);
+ }
+ assertScan(expected, tableName);
+
+ // compact
+ client.compactTable(creds, tableName, null, null, null, true, true, null);
+ assertEquals(1, countFiles(tableName));
+ assertScan(expected, tableName);
+
+ // Clone the table
+ client.cloneTable(creds, tableName, TABLE_TEST2, true, null, null);
+ Set<String> tablesToScan = new HashSet<>();
+ tablesToScan.add(tableName);
+ tablesToScan.add(TABLE_TEST2);
+ tablesToScan.add("foo");
+
+ client.createTable(creds, "foo", true, TimeType.MILLIS);
+
+ // get disk usage
+ List<DiskUsage> diskUsage = (client.getDiskUsage(creds, tablesToScan));
+ assertEquals(2, diskUsage.size());
+ // The original table and the clone are lumped together (they share the same files)
+ assertEquals(2, diskUsage.get(0).getTables().size());
+ // The empty table we created
+ assertEquals(1, diskUsage.get(1).getTables().size());
+
+ // Compact the clone so it writes its own files instead of referring to the original
+ client.compactTable(creds, TABLE_TEST2, null, null, null, true, true, null);
+
+ diskUsage = (client.getDiskUsage(creds, tablesToScan));
+ assertEquals(3, diskUsage.size());
+ // The original
+ assertEquals(1, diskUsage.get(0).getTables().size());
+ // The clone w/ its own files now
+ assertEquals(1, diskUsage.get(1).getTables().size());
+ // The empty table
+ assertEquals(1, diskUsage.get(2).getTables().size());
+ client.deleteTable(creds, "foo");
+ client.deleteTable(creds, TABLE_TEST2);
+ }
+
+ @Test
+ public void importExportTable() throws Exception {
+ // Write some data
+ String expected[][] = new String[10][];
+ for (int i = 0; i < 10; i++) {
+ client.updateAndFlush(creds, tableName, mutation("row" + i, "cf", "cq", "" + i));
+ expected[i] = new String[] {"row" + i, "cf", "cq", "" + i};
+ client.flushTable(creds, tableName, null, null, true);
+ }
+ assertScan(expected, tableName);
+
+ // export/import
+ MiniAccumuloClusterImpl cluster = SharedMiniClusterBase.getCluster();
+ FileSystem fs = cluster.getFileSystem();
+ Path base = cluster.getTemporaryPath();
+ Path dir = new Path(base, "test");
+ assertTrue(fs.mkdirs(dir));
+ Path destDir = new Path(base, "test_dest");
+ assertTrue(fs.mkdirs(destDir));
+ client.offlineTable(creds, tableName, false);
+ client.exportTable(creds, tableName, dir.toString());
+ // copy files to a new location
+ FSDataInputStream is = fs.open(new Path(dir, "distcp.txt"));
+ try (BufferedReader r = new BufferedReader(new InputStreamReader(is, UTF_8))) {
+ while (true) {
+ String line = r.readLine();
+ if (line == null)
+ break;
+ Path srcPath = new Path(line);
+ FileUtil.copy(fs, srcPath, fs, destDir, false, fs.getConf());
+ }
+ }
+ client.deleteTable(creds, tableName);
+ client.importTable(creds, "testify", destDir.toString());
+ assertScan(expected, "testify");
+ client.deleteTable(creds, "testify");
+
+ try {
+ // ACCUMULO-1558 a second import from the same dir should fail, the first import moved the files
+ client.importTable(creds, "testify2", destDir.toString());
+ fail();
+ } catch (Exception e) {}
+
+ assertFalse(client.listTables(creds).contains("testify2"));
+ }
+
+ @Test
+ public void localityGroups() throws Exception {
+ Map<String,Set<String>> groups = new HashMap<>();
+ groups.put("group1", Collections.singleton("cf1"));
+ groups.put("group2", Collections.singleton("cf2"));
+ client.setLocalityGroups(creds, tableName, groups);
+ assertEquals(groups, client.getLocalityGroups(creds, tableName));
+ }
+
+ @Test
+ public void tableProperties() throws Exception {
+ Map<String,String> systemProps = client.getSystemConfiguration(creds);
+ String systemTableSplitThreshold = systemProps.get("table.split.threshold");
+
+ Map<String,String> orig = client.getTableProperties(creds, tableName);
+ client.setTableProperty(creds, tableName, "table.split.threshold", "500M");
+
+ // Get the new table property value
+ Map<String,String> update = client.getTableProperties(creds, tableName);
+ assertEquals(update.get("table.split.threshold"), "500M");
+
+ // Table level properties shouldn't affect system level values
+ assertEquals(systemTableSplitThreshold, client.getSystemConfiguration(creds).get("table.split.threshold"));
+
+ client.removeTableProperty(creds, tableName, "table.split.threshold");
+ update = client.getTableProperties(creds, tableName);
+ assertEquals(orig, update);
+ }
+
+ @Test
+ public void tableRenames() throws Exception {
+ // rename table
+ Map<String,String> tables = client.tableIdMap(creds);
+ client.renameTable(creds, tableName, "bar");
+ Map<String,String> tables2 = client.tableIdMap(creds);
+ assertEquals(tables.get(tableName), tables2.get("bar"));
+ // table exists
+ assertTrue(client.tableExists(creds, "bar"));
+ assertFalse(client.tableExists(creds, tableName));
+ client.renameTable(creds, "bar", tableName);
+ }
+
+ @Test
+ public void bulkImport() throws Exception {
+ MiniAccumuloClusterImpl cluster = SharedMiniClusterBase.getCluster();
+ FileSystem fs = cluster.getFileSystem();
+ Path base = cluster.getTemporaryPath();
+ Path dir = new Path(base, "test");
+ assertTrue(fs.mkdirs(dir));
+
+ // Write an RFile
+ String filename = dir + "/bulk/import/rfile.rf";
+ FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder().forFile(filename, fs, fs.getConf())
+ .withTableConfiguration(DefaultConfiguration.getInstance()).build();
+ writer.startDefaultLocalityGroup();
+ writer.append(new org.apache.accumulo.core.data.Key(new Text("a"), new Text("b"), new Text("c")), new Value("value".getBytes(UTF_8)));
+ writer.close();
+
+ // Create failures directory
+ fs.mkdirs(new Path(dir + "/bulk/fail"));
+
+ // Run the bulk import
+ client.importDirectory(creds, tableName, dir + "/bulk/import", dir + "/bulk/fail", true);
+
+ // Make sure we find the data
+ String scanner = client.createScanner(creds, tableName, null);
+ ScanResult more = client.nextK(scanner, 100);
+ client.closeScanner(scanner);
+ assertEquals(1, more.results.size());
+ ByteBuffer maxRow = client.getMaxRow(creds, tableName, null, null, false, null, false);
+ assertEquals(s2bb("a"), maxRow);
+ }
+
+ @Test
+ public void testTableClassLoad() throws Exception {
+ assertFalse(client.testTableClassLoad(creds, tableName, "abc123", SortedKeyValueIterator.class.getName()));
+ assertTrue(client.testTableClassLoad(creds, tableName, VersioningIterator.class.getName(), SortedKeyValueIterator.class.getName()));
+ }
+
+ private Condition newCondition(String cf, String cq) {
+ return new Condition(new Column(s2bb(cf), s2bb(cq), s2bb("")));
+ }
+
+ private Condition newCondition(String cf, String cq, String val) {
+ return newCondition(cf, cq).setValue(s2bb(val));
+ }
+
+ private Condition newCondition(String cf, String cq, long ts, String val) {
+ return newCondition(cf, cq).setValue(s2bb(val)).setTimestamp(ts);
+ }
+
+ private ColumnUpdate newColUpdate(String cf, String cq, String val) {
+ return new ColumnUpdate(s2bb(cf), s2bb(cq)).setValue(s2bb(val));
+ }
+
+ private ColumnUpdate newColUpdate(String cf, String cq, long ts, String val) {
+ return new ColumnUpdate(s2bb(cf), s2bb(cq)).setTimestamp(ts).setValue(s2bb(val));
+ }
+
+ private void assertScan(String[][] expected, String table) throws Exception {
+ String scid = client.createScanner(creds, table, new ScanOptions());
+ ScanResult keyValues = client.nextK(scid, expected.length + 1);
+
+ assertEquals("Saw " + keyValues.results, expected.length, keyValues.results.size());
+ assertFalse(keyValues.more);
+
+ for (int i = 0; i < keyValues.results.size(); i++) {
+ checkKey(expected[i][0], expected[i][1], expected[i][2], expected[i][3], keyValues.results.get(i));
+ }
+
+ client.closeScanner(scid);
+ }
+
+ @Test
+ public void testConditionalWriter() throws Exception {
+ log.debug("Adding constraint {} to {}", tableName, NumericValueConstraint.class.getName());
+ client.addConstraint(creds, tableName, NumericValueConstraint.class.getName());
+ sleepUninterruptibly(ZOOKEEPER_PROPAGATION_TIME, TimeUnit.MILLISECONDS);
+
+ // Take the table offline and online to force a config update
+ client.offlineTable(creds, tableName, true);
+ client.onlineTable(creds, tableName, true);
+
+ while (!client.listConstraints(creds, tableName).containsKey(NumericValueConstraint.class.getName())) {
+ log.info("Failed to see constraint");
+ Thread.sleep(1000);
+ }
+
+ String cwid = client.createConditionalWriter(creds, tableName, new ConditionalWriterOptions());
+
+ Map<ByteBuffer,ConditionalUpdates> updates = new HashMap<>();
+
+ updates.put(
+ s2bb("00345"),
+ new ConditionalUpdates(Arrays.asList(newCondition("meta", "seq")), Arrays.asList(newColUpdate("meta", "seq", 10, "1"),
+ newColUpdate("data", "img", "73435435"))));
+
+ Map<ByteBuffer,ConditionalStatus> results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ assertEquals(ConditionalStatus.ACCEPTED, results.get(s2bb("00345")));
+
+ assertScan(new String[][] { {"00345", "data", "img", "73435435"}, {"00345", "meta", "seq", "1"}}, tableName);
+
+ // test not setting values on conditions
+ updates.clear();
+
+ updates.put(s2bb("00345"), new ConditionalUpdates(Arrays.asList(newCondition("meta", "seq")), Arrays.asList(newColUpdate("meta", "seq", "2"))));
+ updates.put(s2bb("00346"), new ConditionalUpdates(Arrays.asList(newCondition("meta", "seq")), Arrays.asList(newColUpdate("meta", "seq", "1"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(2, results.size());
+ assertEquals(ConditionalStatus.REJECTED, results.get(s2bb("00345")));
+ assertEquals(ConditionalStatus.ACCEPTED, results.get(s2bb("00346")));
+
+ assertScan(new String[][] { {"00345", "data", "img", "73435435"}, {"00345", "meta", "seq", "1"}, {"00346", "meta", "seq", "1"}}, tableName);
+
+ // test setting values on conditions
+ updates.clear();
+
+ updates.put(
+ s2bb("00345"),
+ new ConditionalUpdates(Arrays.asList(newCondition("meta", "seq", "1")), Arrays.asList(newColUpdate("meta", "seq", 20, "2"),
+ newColUpdate("data", "img", "567890"))));
+
+ updates.put(s2bb("00346"), new ConditionalUpdates(Arrays.asList(newCondition("meta", "seq", "2")), Arrays.asList(newColUpdate("meta", "seq", "3"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(2, results.size());
+ assertEquals(ConditionalStatus.ACCEPTED, results.get(s2bb("00345")));
+ assertEquals(ConditionalStatus.REJECTED, results.get(s2bb("00346")));
+
+ assertScan(new String[][] { {"00345", "data", "img", "567890"}, {"00345", "meta", "seq", "2"}, {"00346", "meta", "seq", "1"}}, tableName);
+
+ // test setting timestamp on condition to a non-existant version
+ updates.clear();
+
+ updates.put(
+ s2bb("00345"),
+ new ConditionalUpdates(Arrays.asList(newCondition("meta", "seq", 10, "2")), Arrays.asList(newColUpdate("meta", "seq", 30, "3"),
+ newColUpdate("data", "img", "1234567890"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ assertEquals(ConditionalStatus.REJECTED, results.get(s2bb("00345")));
+
+ assertScan(new String[][] { {"00345", "data", "img", "567890"}, {"00345", "meta", "seq", "2"}, {"00346", "meta", "seq", "1"}}, tableName);
+
+ // test setting timestamp to an existing version
+
+ updates.clear();
+
+ updates.put(
+ s2bb("00345"),
+ new ConditionalUpdates(Arrays.asList(newCondition("meta", "seq", 20, "2")), Arrays.asList(newColUpdate("meta", "seq", 30, "3"),
+ newColUpdate("data", "img", "1234567890"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ assertEquals(ConditionalStatus.ACCEPTED, results.get(s2bb("00345")));
+
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"}}, tableName);
+
+ // run test w/ condition that has iterators
+ // following should fail w/o iterator
+ client.updateAndFlush(creds, tableName, Collections.singletonMap(s2bb("00347"), Arrays.asList(newColUpdate("data", "count", "1"))));
+ client.updateAndFlush(creds, tableName, Collections.singletonMap(s2bb("00347"), Arrays.asList(newColUpdate("data", "count", "1"))));
+ client.updateAndFlush(creds, tableName, Collections.singletonMap(s2bb("00347"), Arrays.asList(newColUpdate("data", "count", "1"))));
+
+ updates.clear();
+ updates.put(s2bb("00347"),
+ new ConditionalUpdates(Arrays.asList(newCondition("data", "count", "3")), Arrays.asList(newColUpdate("data", "img", "1234567890"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ assertEquals(ConditionalStatus.REJECTED, results.get(s2bb("00347")));
+
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"},
+ {"00347", "data", "count", "1"}}, tableName);
+
+ // following test w/ iterator setup should succeed
+ Condition iterCond = newCondition("data", "count", "3");
+ Map<String,String> props = new HashMap<>();
+ props.put("type", "STRING");
+ props.put("columns", "data:count");
+ IteratorSetting is = new IteratorSetting(1, "sumc", SummingCombiner.class.getName(), props);
+ iterCond.setIterators(Arrays.asList(is));
+
+ updates.clear();
+ updates.put(s2bb("00347"), new ConditionalUpdates(Arrays.asList(iterCond), Arrays.asList(newColUpdate("data", "img", "1234567890"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ assertEquals(ConditionalStatus.ACCEPTED, results.get(s2bb("00347")));
+
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"},
+ {"00347", "data", "count", "1"}, {"00347", "data", "img", "1234567890"}}, tableName);
+
+ ConditionalStatus status = null;
+ for (int i = 0; i < 30; i++) {
+ // test a mutation that violated a constraint
+ updates.clear();
+ updates.put(s2bb("00347"),
+ new ConditionalUpdates(Arrays.asList(newCondition("data", "img", "1234567890")), Arrays.asList(newColUpdate("data", "count", "A"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ status = results.get(s2bb("00347"));
+ if (ConditionalStatus.VIOLATED != status) {
+ log.info("ConditionalUpdate was not rejected by server due to table constraint. Sleeping and retrying");
+ Thread.sleep(5000);
+ continue;
+ }
+
+ assertEquals(ConditionalStatus.VIOLATED, status);
+ break;
+ }
+
+ // Final check to make sure we succeeded and didn't exceed the retries
+ assertEquals(ConditionalStatus.VIOLATED, status);
+
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"},
+ {"00347", "data", "count", "1"}, {"00347", "data", "img", "1234567890"}}, tableName);
+
+ // run test with two conditions
+ // both conditions should fail
+ updates.clear();
+ updates.put(
+ s2bb("00347"),
+ new ConditionalUpdates(Arrays.asList(newCondition("data", "img", "565"), newCondition("data", "count", "2")), Arrays.asList(
+ newColUpdate("data", "count", "3"), newColUpdate("data", "img", "0987654321"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ assertEquals(ConditionalStatus.REJECTED, results.get(s2bb("00347")));
+
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"},
+ {"00347", "data", "count", "1"}, {"00347", "data", "img", "1234567890"}}, tableName);
+
+ // one condition should fail
+ updates.clear();
+ updates.put(
+ s2bb("00347"),
+ new ConditionalUpdates(Arrays.asList(newCondition("data", "img", "1234567890"), newCondition("data", "count", "2")), Arrays.asList(
+ newColUpdate("data", "count", "3"), newColUpdate("data", "img", "0987654321"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ assertEquals(ConditionalStatus.REJECTED, results.get(s2bb("00347")));
+
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"},
+ {"00347", "data", "count", "1"}, {"00347", "data", "img", "1234567890"}}, tableName);
+
+ // one condition should fail
+ updates.clear();
+ updates.put(
+ s2bb("00347"),
+ new ConditionalUpdates(Arrays.asList(newCondition("data", "img", "565"), newCondition("data", "count", "1")), Arrays.asList(
+ newColUpdate("data", "count", "3"), newColUpdate("data", "img", "0987654321"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ assertEquals(ConditionalStatus.REJECTED, results.get(s2bb("00347")));
+
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"},
+ {"00347", "data", "count", "1"}, {"00347", "data", "img", "1234567890"}}, tableName);
+
+ // both conditions should succeed
+
+ ConditionalStatus result = client.updateRowConditionally(
+ creds,
+ tableName,
+ s2bb("00347"),
+ new ConditionalUpdates(Arrays.asList(newCondition("data", "img", "1234567890"), newCondition("data", "count", "1")), Arrays.asList(
+ newColUpdate("data", "count", "3"), newColUpdate("data", "img", "0987654321"))));
+
+ assertEquals(ConditionalStatus.ACCEPTED, result);
+
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"},
+ {"00347", "data", "count", "3"}, {"00347", "data", "img", "0987654321"}}, tableName);
+
+ client.closeConditionalWriter(cwid);
+ try {
+ client.updateRowsConditionally(cwid, updates);
+ fail("conditional writer not closed");
+ } catch (UnknownWriter uk) {}
+
+ String principal;
+ ClusterUser cwuser = null;
+ if (isKerberosEnabled()) {
+ cwuser = getKdc().getClientPrincipal(1);
+ principal = cwuser.getPrincipal();
+ client.createLocalUser(creds, principal, s2bb("unused"));
+
+ } else {
+ principal = "cwuser";
+ // run test with colvis
+ client.createLocalUser(creds, principal, s2bb("bestpasswordever"));
+ }
+
+ client.changeUserAuthorizations(creds, principal, Collections.singleton(s2bb("A")));
+ client.grantTablePermission(creds, principal, tableName, TablePermission.WRITE);
+ client.grantTablePermission(creds, principal, tableName, TablePermission.READ);
+
+ TestProxyClient cwuserProxyClient = null;
+ Client origClient = null;
+ Map<String,String> cwProperties;
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(cwuser.getPrincipal(), cwuser.getKeytab().getAbsolutePath());
+ final UserGroupInformation cwuserUgi = UserGroupInformation.getCurrentUser();
+ // Re-login in and make a new connection. Can't use the previous one
+ cwuserProxyClient = new TestProxyClient(hostname, proxyPort, factory, proxyPrimary, cwuserUgi);
+ origClient = client;
+ client = cwuserProxyClient.proxy();
+ cwProperties = Collections.emptyMap();
+ } else {
+ cwProperties = Collections.singletonMap("password", "bestpasswordever");
+ }
+
+ try {
+ ByteBuffer cwCreds = client.login(principal, cwProperties);
+
+ cwid = client.createConditionalWriter(cwCreds, tableName, new ConditionalWriterOptions().setAuthorizations(Collections.singleton(s2bb("A"))));
+
+ updates.clear();
+ updates.put(
+ s2bb("00348"),
+ new ConditionalUpdates(Arrays.asList(new Condition(new Column(s2bb("data"), s2bb("c"), s2bb("A")))), Arrays.asList(newColUpdate("data", "seq", "1"),
+ newColUpdate("data", "c", "1").setColVisibility(s2bb("A")))));
+ updates
+ .put(
+ s2bb("00349"),
+ new ConditionalUpdates(Arrays.asList(new Condition(new Column(s2bb("data"), s2bb("c"), s2bb("B")))), Arrays.asList(newColUpdate("data", "seq",
+ "1"))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(2, results.size());
+ assertEquals(ConditionalStatus.ACCEPTED, results.get(s2bb("00348")));
+ assertEquals(ConditionalStatus.INVISIBLE_VISIBILITY, results.get(s2bb("00349")));
+
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+ // Verify that the original user can't see the updates with visibilities set
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"},
+ {"00347", "data", "count", "3"}, {"00347", "data", "img", "0987654321"}, {"00348", "data", "seq", "1"}}, tableName);
+
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(cwuser.getPrincipal(), cwuser.getKeytab().getAbsolutePath());
+ client = cwuserProxyClient.proxy();
+ }
+
+ updates.clear();
+
+ updates.clear();
+ updates.put(s2bb("00348"), new ConditionalUpdates(Arrays.asList(new Condition(new Column(s2bb("data"), s2bb("c"), s2bb("A"))).setValue(s2bb("0"))),
+ Arrays.asList(newColUpdate("data", "seq", "2"), newColUpdate("data", "c", "2").setColVisibility(s2bb("A")))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ assertEquals(ConditionalStatus.REJECTED, results.get(s2bb("00348")));
+
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+
+ // Same results as the original user
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"},
+ {"00347", "data", "count", "3"}, {"00347", "data", "img", "0987654321"}, {"00348", "data", "seq", "1"}}, tableName);
+
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(cwuser.getPrincipal(), cwuser.getKeytab().getAbsolutePath());
+ client = cwuserProxyClient.proxy();
+ }
+
+ updates.clear();
+ updates.put(s2bb("00348"), new ConditionalUpdates(Arrays.asList(new Condition(new Column(s2bb("data"), s2bb("c"), s2bb("A"))).setValue(s2bb("1"))),
+ Arrays.asList(newColUpdate("data", "seq", "2"), newColUpdate("data", "c", "2").setColVisibility(s2bb("A")))));
+
+ results = client.updateRowsConditionally(cwid, updates);
+
+ assertEquals(1, results.size());
+ assertEquals(ConditionalStatus.ACCEPTED, results.get(s2bb("00348")));
+
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ client = origClient;
+ }
+
+ assertScan(new String[][] { {"00345", "data", "img", "1234567890"}, {"00345", "meta", "seq", "3"}, {"00346", "meta", "seq", "1"},
+ {"00347", "data", "count", "3"}, {"00347", "data", "img", "0987654321"}, {"00348", "data", "seq", "2"}}, tableName);
+
+ if (isKerberosEnabled()) {
+ UserGroupInformation.loginUserFromKeytab(cwuser.getPrincipal(), cwuser.getKeytab().getAbsolutePath());
+ client = cwuserProxyClient.proxy();
+ }
+
+ client.closeConditionalWriter(cwid);
+ try {
+ client.updateRowsConditionally(cwid, updates);
+ fail("conditional writer not closed");
+ } catch (UnknownWriter uk) {}
+ } finally {
+ if (isKerberosEnabled()) {
+ // Close the other client
+ if (null != cwuserProxyClient) {
+ cwuserProxyClient.close();
+ }
+
+ UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab.getAbsolutePath());
+ // Re-login and restore the original client
+ client = origClient;
+ }
+ client.dropLocalUser(creds, principal);
+ }
+ }
+
+ private void checkKey(String row, String cf, String cq, String val, KeyValue keyValue) {
+ assertEquals(row, ByteBufferUtil.toString(keyValue.key.row));
+ assertEquals(cf, ByteBufferUtil.toString(keyValue.key.colFamily));
+ assertEquals(cq, ByteBufferUtil.toString(keyValue.key.colQualifier));
+ assertEquals("", ByteBufferUtil.toString(keyValue.key.colVisibility));
+ assertEquals(val, ByteBufferUtil.toString(keyValue.value));
+ }
+
+ // scan metadata for file entries for the given table
+ private int countFiles(String table) throws Exception {
+ Map<String,String> tableIdMap = client.tableIdMap(creds);
+ String tableId = tableIdMap.get(table);
+ Key start = new Key();
+ start.row = s2bb(tableId + ";");
+ Key end = new Key();
+ end.row = s2bb(tableId + "<");
+ end = client.getFollowing(end, PartialKey.ROW);
+ ScanOptions opt = new ScanOptions();
+ opt.range = new Range(start, true, end, false);
+ opt.columns = Collections.singletonList(new ScanColumn(s2bb("file")));
+ String scanner = client.createScanner(creds, MetadataTable.NAME, opt);
+ int result = 0;
+ while (true) {
+ ScanResult more = client.nextK(scanner, 100);
+ result += more.getResults().size();
+ if (!more.more)
+ break;
+ }
+ return result;
+ }
+
+ private Map<ByteBuffer,List<ColumnUpdate>> mutation(String row, String cf, String cq, String value) {
+ ColumnUpdate upd = new ColumnUpdate(s2bb(cf), s2bb(cq));
+ upd.setValue(value.getBytes(UTF_8));
+ return Collections.singletonMap(s2bb(row), Collections.singletonList(upd));
+ }
+
+ private ByteBuffer s2bb(String cf) {
+ return ByteBuffer.wrap(cf.getBytes(UTF_8));
+ }
+
+ private Map<String,String> s2pp(String cf) {
+ Map<String,String> toRet = new TreeMap<>();
+ toRet.put("password", cf);
+ return toRet;
+ }
+
+ static private ByteBuffer t2bb(Text t) {
+ return ByteBuffer.wrap(t.getBytes());
+ }
+
+ @Test
+ public void testGetRowRange() throws Exception {
+ Range range = client.getRowRange(s2bb("xyzzy"));
+ org.apache.accumulo.core.data.Range range2 = new org.apache.accumulo.core.data.Range(new Text("xyzzy"));
+ assertEquals(0, range.start.row.compareTo(t2bb(range2.getStartKey().getRow())));
+ assertEquals(0, range.stop.row.compareTo(t2bb(range2.getEndKey().getRow())));
+ assertEquals(range.startInclusive, range2.isStartKeyInclusive());
+ assertEquals(range.stopInclusive, range2.isEndKeyInclusive());
+ assertEquals(0, range.start.colFamily.compareTo(t2bb(range2.getStartKey().getColumnFamily())));
+ assertEquals(0, range.start.colQualifier.compareTo(t2bb(range2.getStartKey().getColumnQualifier())));
+ assertEquals(0, range.stop.colFamily.compareTo(t2bb(range2.getEndKey().getColumnFamily())));
+ assertEquals(0, range.stop.colQualifier.compareTo(t2bb(range2.getEndKey().getColumnQualifier())));
+ assertEquals(range.start.timestamp, range.start.timestamp);
+ assertEquals(range.stop.timestamp, range.stop.timestamp);
+ }
+
+ @Test
+ public void testCompactionStrategy() throws Exception {
+ File jarDir = new File(System.getProperty("user.dir"), "target");
+ assertTrue(jarDir.mkdirs() || jarDir.isDirectory());
+ File jarFile = new File(jarDir, "TestCompactionStrat.jar");
+ FileUtils.copyInputStreamToFile(Class.class.getResourceAsStream("/TestCompactionStrat.jar"), jarFile);
+ client.setProperty(creds, Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "context1", jarFile.toString());
+ client.setTableProperty(creds, tableName, Property.TABLE_CLASSPATH.getKey(), "context1");
+
+ client.addSplits(creds, tableName, Collections.singleton(s2bb("efg")));
+
+ client.updateAndFlush(creds, tableName, mutation("a", "cf", "cq", "v1"));
+ client.flushTable(creds, tableName, null, null, true);
+
+ client.updateAndFlush(creds, tableName, mutation("b", "cf", "cq", "v2"));
+ client.flushTable(creds, tableName, null, null, true);
+
+ client.updateAndFlush(creds, tableName, mutation("y", "cf", "cq", "v1"));
+ client.flushTable(creds, tableName, null, null, true);
+
+ client.updateAndFlush(creds, tableName, mutation("z", "cf", "cq", "v2"));
+ client.flushTable(creds, tableName, null, null, true);
+
+ assertEquals(4, countFiles(tableName));
+
+ CompactionStrategyConfig csc = new CompactionStrategyConfig();
+
+ // The EfgCompactionStrat will only compact tablets with and end row of efg
+ csc.setClassName("org.apache.accumulo.test.EfgCompactionStrat");
+
+ client.compactTable(creds, tableName, null, null, null, true, true, csc);
+
+ assertEquals(3, countFiles(tableName));
+ }
+
+ @Test
+ public void namespaceOperations() throws Exception {
+ // default namespace and accumulo namespace
+ assertEquals("System namespace is wrong", client.systemNamespace(), Namespaces.ACCUMULO_NAMESPACE);
+ assertEquals("Default namespace is wrong", client.defaultNamespace(), Namespaces.DEFAULT_NAMESPACE);
+
+ // namespace existance and namespace listing
+ assertTrue("Namespace created during setup should exist", client.namespaceExists(creds, namespaceName));
+ assertTrue("Namespace listing should contain namespace created during setup", client.listNamespaces(creds).contains(namespaceName));
+
+ // create new namespace
+ String newNamespace = "foobar";
+ client.createNamespace(creds, newNamespace);
+
+ assertTrue("Namespace just created should exist", client.namespaceExists(creds, newNamespace));
+ assertTrue("Namespace listing should contain just created", client.listNamespaces(creds).contains(newNamespace));
+
+ // rename the namespace
+ String renamedNamespace = "foobar_renamed";
+ client.renameNamespace(creds, newNamespace, renamedNamespace);
+
+ assertTrue("Renamed namespace should exist", client.namespaceExists(creds, renamedNamespace));
+ assertTrue("Namespace listing should contain renamed namespace", client.listNamespaces(creds).contains(renamedNamespace));
+
+ assertFalse("Original namespace should no longer exist", client.namespaceExists(creds, newNamespace));
+ assertFalse("Namespace listing should no longer contain original namespace", client.listNamespaces(creds).contains(newNamespace));
+
+ // delete the namespace
+ client.deleteNamespace(creds, renamedNamespace);
+ assertFalse("Renamed namespace should no longer exist", client.namespaceExists(creds, renamedNamespace));
+ assertFalse("Namespace listing should no longer contain renamed namespace", client.listNamespaces(creds).contains(renamedNamespace));
+
+ // namespace properties
+ Map<String,String> cfg = client.getNamespaceProperties(creds, namespaceName);
+ String defaultProp = cfg.get("table.compaction.major.ratio");
+ assertNotEquals(defaultProp, "10"); // let's make sure we are setting this value to something different than default...
+ client.setNamespaceProperty(creds, namespaceName, "table.compaction.major.ratio", "10");
+ for (int i = 0; i < 5; i++) {
+ cfg = client.getNamespaceProperties(creds, namespaceName);
+ if ("10".equals(cfg.get("table.compaction.major.ratio"))) {
+ break;
+ }
+ sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ assertTrue("Namespace should contain table.compaction.major.ratio property",
+ client.getNamespaceProperties(creds, namespaceName).containsKey("table.compaction.major.ratio"));
+ assertEquals("Namespace property table.compaction.major.ratio property should equal 10",
+ client.getNamespaceProperties(creds, namespaceName).get("table.compaction.major.ratio"), "10");
+ client.removeNamespaceProperty(creds, namespaceName, "table.compaction.major.ratio");
+ for (int i = 0; i < 5; i++) {
+ cfg = client.getNamespaceProperties(creds, namespaceName);
+ if (!defaultProp.equals(cfg.get("table.compaction.major.ratio"))) {
+ break;
+ }
+ sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ assertEquals("Namespace should have default value for table.compaction.major.ratio", defaultProp, cfg.get("table.compaction.major.ratio"));
+
+ // namespace ID map
+ assertTrue("Namespace ID map should contain accumulo", client.namespaceIdMap(creds).containsKey("accumulo"));
+ assertTrue("Namespace ID map should contain namespace created during setup", client.namespaceIdMap(creds).containsKey(namespaceName));
+
+ // namespace iterators
+ IteratorSetting setting = new IteratorSetting(100, "DebugTheThings", DebugIterator.class.getName(), Collections.<String,String> emptyMap());
+ client.attachNamespaceIterator(creds, namespaceName, setting, EnumSet.of(IteratorScope.SCAN));
+ assertEquals("Wrong iterator setting returned", setting, client.getNamespaceIteratorSetting(creds, namespaceName, "DebugTheThings", IteratorScope.SCAN));
+ assertTrue("Namespace iterator settings should contain iterator just added",
+ client.listNamespaceIterators(creds, namespaceName).containsKey("DebugTheThings"));
+ assertEquals("Namespace iterator listing should contain iterator scope just added", EnumSet.of(IteratorScope.SCAN),
+ client.listNamespaceIterators(creds, namespaceName).get("DebugTheThings"));
+ client.checkNamespaceIteratorConflicts(creds, namespaceName, setting, EnumSet.of(IteratorScope.MAJC));
+ client.removeNamespaceIterator(creds, namespaceName, "DebugTheThings", EnumSet.of(IteratorScope.SCAN));
+ assertFalse("Namespace iterator settings should contain iterator just added",
+ client.listNamespaceIterators(creds, namespaceName).containsKey("DebugTheThings"));
+
+ // namespace constraints
+ int id = client.addNamespaceConstraint(creds, namespaceName, MaxMutationSize.class.getName());
+ assertTrue("Namespace should contain max mutation size constraint",
+ client.listNamespaceConstraints(creds, namespaceName).containsKey(MaxMutationSize.class.getName()));
+ assertEquals("Namespace max mutation size constraint id is wrong", id,
+ (int) client.listNamespaceConstraints(creds, namespaceName).get(MaxMutationSize.class.getName()));
+ client.removeNamespaceConstraint(creds, namespaceName, id);
+ assertFalse("Namespace should no longer contain max mutation size constraint",
+ client.listNamespaceConstraints(creds, namespaceName).containsKey(MaxMutationSize.class.getName()));
+
+ // namespace class load
+ assertTrue("Namespace class load should work",
+ client.testNamespaceClassLoad(creds, namespaceName, DebugIterator.class.getName(), SortedKeyValueIterator.class.getName()));
+ assertFalse("Namespace class load should not work", client.testNamespaceClassLoad(creds, namespaceName, "foo.bar", SortedKeyValueIterator.class.getName()));
+ }
+}
diff --cc test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java
index ff94dd4,0000000..8bc9bf5
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyInstanceOperations.java
@@@ -1,84 -1,0 +1,83 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
- import com.google.common.net.HostAndPort;
-
+public class TestProxyInstanceOperations {
+ private static final Logger log = LoggerFactory.getLogger(TestProxyInstanceOperations.class);
+
+ protected static TServer proxy;
+ protected static TestProxyClient tpc;
+ protected static ByteBuffer userpass;
+ protected static final int port = 10197;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ Properties prop = new Properties();
+ prop.setProperty("useMockInstance", "true");
+ prop.put("tokenClass", PasswordToken.class.getName());
+
+ proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server;
+ log.info("Waiting for proxy to start");
+ while (!proxy.isServing()) {
+ Thread.sleep(500);
+ }
+ log.info("Proxy started");
+ tpc = new TestProxyClient("localhost", port);
+ userpass = tpc.proxy.login("root", Collections.singletonMap("password", ""));
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
+ proxy.stop();
+ }
+
+ @Test
+ public void properties() throws TException {
+ tpc.proxy().setProperty(userpass, "test.systemprop", "whistletips");
+
+ assertEquals(tpc.proxy().getSystemConfiguration(userpass).get("test.systemprop"), "whistletips");
+ tpc.proxy().removeProperty(userpass, "test.systemprop");
+ assertNull(tpc.proxy().getSystemConfiguration(userpass).get("test.systemprop"));
+
+ }
+
+ @Test
+ public void testClassLoad() throws TException {
+ assertTrue(tpc.proxy().testClassLoad(userpass, "org.apache.accumulo.core.iterators.user.RegExFilter", "org.apache.accumulo.core.iterators.Filter"));
+ }
+
+}
diff --cc test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java
index 764a08c,0000000..34ffdb4
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyReadWrite.java
@@@ -1,468 -1,0 +1,467 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.accumulo.proxy.Util;
+import org.apache.accumulo.proxy.thrift.BatchScanOptions;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.IteratorSetting;
+import org.apache.accumulo.proxy.thrift.Key;
+import org.apache.accumulo.proxy.thrift.KeyValue;
+import org.apache.accumulo.proxy.thrift.Range;
+import org.apache.accumulo.proxy.thrift.ScanColumn;
+import org.apache.accumulo.proxy.thrift.ScanOptions;
+import org.apache.accumulo.proxy.thrift.ScanResult;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
- import com.google.common.net.HostAndPort;
-
+public class TestProxyReadWrite {
+ protected static TServer proxy;
+ protected static TestProxyClient tpc;
+ protected static ByteBuffer userpass;
+ protected static final int port = 10194;
+ protected static final String testtable = "testtable";
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ Properties prop = new Properties();
+ prop.setProperty("useMockInstance", "true");
+ prop.put("tokenClass", PasswordToken.class.getName());
+
+ proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server;
+ tpc = new TestProxyClient("localhost", port);
+ userpass = tpc.proxy().login("root", Collections.singletonMap("password", ""));
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
+ proxy.stop();
+ }
+
+ @Before
+ public void makeTestTable() throws Exception {
+ tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS);
+ }
+
+ @After
+ public void deleteTestTable() throws Exception {
+ tpc.proxy().deleteTable(userpass, testtable);
+ }
+
+ private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String value) {
+ ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes()));
+ update.setValue(value.getBytes());
+ mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update));
+ }
+
+ private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String vis, String value) {
+ ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes()));
+ update.setValue(value.getBytes());
+ update.setColVisibility(vis.getBytes());
+ mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update));
+ }
+
+ /**
+ * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a range so only the entries between -Inf...5 come back (there should be
+ * 50,000)
+ */
+ @Test
+ public void readWriteBatchOneShotWithRange() throws Exception {
+ int maxInserts = 100000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+ }
+ }
+
+ Key stop = new Key();
+ stop.setRow("5".getBytes());
+ BatchScanOptions options = new BatchScanOptions();
+ options.ranges = Collections.singletonList(new Range(null, false, stop, false));
+ String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ i += kvList.getResultsSize();
+ hasNext = kvList.isMore();
+ }
+ assertEquals(i, 50000);
+ }
+
+ /**
+ * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a columnFamily so only the entries with specified column family come back
+ * (there should be 50,000)
+ */
+ @Test
+ public void readWriteBatchOneShotWithColumnFamilyOnly() throws Exception {
+ int maxInserts = 100000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+
+ addMutation(mutations, String.format(format, i), "cf" + (i % 2), "cq" + (i % 2), Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+ }
+ }
+
+ BatchScanOptions options = new BatchScanOptions();
+
+ ScanColumn sc = new ScanColumn();
+ sc.colFamily = ByteBuffer.wrap("cf0".getBytes());
+
+ options.columns = Collections.singletonList(sc);
+ String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ i += kvList.getResultsSize();
+ hasNext = kvList.isMore();
+ }
+ assertEquals(i, 50000);
+ }
+
+ /**
+ * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a columnFamily + columnQualififer so only the entries with specified column
+ * come back (there should be 50,000)
+ */
+ @Test
+ public void readWriteBatchOneShotWithFullColumn() throws Exception {
+ int maxInserts = 100000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+
+ addMutation(mutations, String.format(format, i), "cf" + (i % 2), "cq" + (i % 2), Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+ }
+ }
+
+ BatchScanOptions options = new BatchScanOptions();
+
+ ScanColumn sc = new ScanColumn();
+ sc.colFamily = ByteBuffer.wrap("cf0".getBytes());
+ sc.colQualifier = ByteBuffer.wrap("cq0".getBytes());
+
+ options.columns = Collections.singletonList(sc);
+ String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ i += kvList.getResultsSize();
+ hasNext = kvList.isMore();
+ }
+ assertEquals(i, 50000);
+ }
+
+ /**
+ * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Filter the results so only the even numbers come back.
+ */
+ @Test
+ public void readWriteBatchOneShotWithFilterIterator() throws Exception {
+ int maxInserts = 10000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+ }
+
+ }
+
+ String regex = ".*[02468]";
+
+ org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
+ RegExFilter.setRegexs(is, regex, null, null, null, false);
+
+ IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
+ ScanOptions opts = new ScanOptions();
+ opts.iterators = Collections.singletonList(pis);
+ String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ for (KeyValue kv : kvList.getResults()) {
+ assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+
+ i += 2;
+ }
+ hasNext = kvList.isMore();
+ }
+ }
+
+ @Test
+ public void readWriteOneShotWithRange() throws Exception {
+ int maxInserts = 100000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+ }
+ }
+
+ Key stop = new Key();
+ stop.setRow("5".getBytes());
+ ScanOptions opts = new ScanOptions();
+ opts.range = new Range(null, false, stop, false);
+ String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ i += kvList.getResultsSize();
+ hasNext = kvList.isMore();
+ }
+ assertEquals(i, 50000);
+ }
+
+ /**
+ * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Filter the results so only the even numbers come back.
+ */
+ @Test
+ public void readWriteOneShotWithFilterIterator() throws Exception {
+ int maxInserts = 10000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+
+ }
+
+ }
+
+ String regex = ".*[02468]";
+
+ org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
+ RegExFilter.setRegexs(is, regex, null, null, null, false);
+
+ IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
+ ScanOptions opts = new ScanOptions();
+ opts.iterators = Collections.singletonList(pis);
+ String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ for (KeyValue kv : kvList.getResults()) {
+ assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+
+ i += 2;
+ }
+ hasNext = kvList.isMore();
+ }
+ }
+
+ // @Test
+ // This test takes kind of a long time. Enable it if you think you may have memory issues.
+ public void manyWritesAndReads() throws Exception {
+ int maxInserts = 1000000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<>();
+ String format = "%1$06d";
+ String writer = tpc.proxy().createWriter(userpass, testtable, null);
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+
+ tpc.proxy().update(writer, mutations);
+ mutations.clear();
+
+ }
+
+ }
+
+ tpc.proxy().flush(writer);
+ tpc.proxy().closeWriter(writer);
+
+ String cookie = tpc.proxy().createScanner(userpass, testtable, null);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ for (KeyValue kv : kvList.getResults()) {
+ assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+ i++;
+ }
+ hasNext = kvList.isMore();
+ if (hasNext)
+ assertEquals(k, kvList.getResults().size());
+ }
+ assertEquals(maxInserts, i);
+ }
+
+ @Test
+ public void asynchReadWrite() throws Exception {
+ int maxInserts = 10000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<>();
+ String format = "%1$05d";
+ String writer = tpc.proxy().createWriter(userpass, testtable, null);
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().update(writer, mutations);
+ mutations.clear();
+ }
+ }
+
+ tpc.proxy().flush(writer);
+ tpc.proxy().closeWriter(writer);
+
+ String regex = ".*[02468]";
+
+ org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
+ RegExFilter.setRegexs(is, regex, null, null, null, false);
+
+ IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
+ ScanOptions opts = new ScanOptions();
+ opts.iterators = Collections.singletonList(pis);
+ String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ int numRead = 0;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ for (KeyValue kv : kvList.getResults()) {
+ assertEquals(i, Integer.parseInt(new String(kv.getKey().getRow())));
+ numRead++;
+ i += 2;
+ }
+ hasNext = kvList.isMore();
+ }
+ assertEquals(maxInserts / 2, numRead);
+ }
+
+ @Test
+ public void testVisibility() throws Exception {
+
+ Set<ByteBuffer> auths = new HashSet<>();
+ auths.add(ByteBuffer.wrap("even".getBytes()));
+ tpc.proxy().changeUserAuthorizations(userpass, "root", auths);
+
+ int maxInserts = 10000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<>();
+ String format = "%1$05d";
+ String writer = tpc.proxy().createWriter(userpass, testtable, null);
+ for (int i = 0; i < maxInserts; i++) {
+ if (i % 2 == 0)
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, "even", Util.randString(10));
+ else
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, "odd", Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().update(writer, mutations);
+ mutations.clear();
+ }
+ }
+
+ tpc.proxy().flush(writer);
+ tpc.proxy().closeWriter(writer);
+ ScanOptions opts = new ScanOptions();
+ opts.authorizations = auths;
+ String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ int numRead = 0;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ for (KeyValue kv : kvList.getResults()) {
+ assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+ i += 2;
+ numRead++;
+ }
+ hasNext = kvList.isMore();
+
+ }
+ assertEquals(maxInserts / 2, numRead);
+ }
+
+}
diff --cc test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java
index fa6c52e,0000000..7c20381
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxySecurityOperations.java
@@@ -1,160 -1,0 +1,159 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.util.ByteBufferUtil;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.accumulo.proxy.thrift.NamespacePermission;
+import org.apache.accumulo.proxy.thrift.SystemPermission;
+import org.apache.accumulo.proxy.thrift.TablePermission;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
- import com.google.common.net.HostAndPort;
-
+public class TestProxySecurityOperations {
+ protected static TServer proxy;
+ protected static TestProxyClient tpc;
+ protected static ByteBuffer userpass;
+ protected static final int port = 10196;
+ protected static final String testtable = "testtable";
+ protected static final String testuser = "VonJines";
+ protected static final String testnamespace = "testns";
+ protected static final ByteBuffer testpw = ByteBuffer.wrap("fiveones".getBytes());
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ Properties prop = new Properties();
+ prop.setProperty("useMockInstance", "true");
+ prop.put("tokenClass", PasswordToken.class.getName());
+
+ proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server;
+ while (!proxy.isServing()) {
+ Thread.sleep(500);
+ }
+ tpc = new TestProxyClient("localhost", port);
+ userpass = tpc.proxy().login("root", Collections.singletonMap("password", ""));
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
+ proxy.stop();
+ }
+
+ @Before
+ public void makeTestTableAndUser() throws Exception {
+ tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS);
+ tpc.proxy().createLocalUser(userpass, testuser, testpw);
+ tpc.proxy().createNamespace(userpass, testnamespace);
+ }
+
+ @After
+ public void deleteTestTable() throws Exception {
+ tpc.proxy().deleteTable(userpass, testtable);
+ tpc.proxy().dropLocalUser(userpass, testuser);
+ tpc.proxy().deleteNamespace(userpass, testnamespace);
+ }
+
+ @Test
+ public void create() throws TException {
+ tpc.proxy().createLocalUser(userpass, testuser + "2", testpw);
+ assertTrue(tpc.proxy().listLocalUsers(userpass).contains(testuser + "2"));
+ tpc.proxy().dropLocalUser(userpass, testuser + "2");
+ assertTrue(!tpc.proxy().listLocalUsers(userpass).contains(testuser + "2"));
+ }
+
+ @Test
+ public void authenticate() throws TException {
+ assertTrue(tpc.proxy().authenticateUser(userpass, testuser, bb2pp(testpw)));
+ assertFalse(tpc.proxy().authenticateUser(userpass, "EvilUser", bb2pp(testpw)));
+
+ tpc.proxy().changeLocalUserPassword(userpass, testuser, ByteBuffer.wrap("newpass".getBytes()));
+ assertFalse(tpc.proxy().authenticateUser(userpass, testuser, bb2pp(testpw)));
+ assertTrue(tpc.proxy().authenticateUser(userpass, testuser, bb2pp(ByteBuffer.wrap("newpass".getBytes()))));
+
+ }
+
+ @Test
+ public void tablePermissions() throws TException {
+ tpc.proxy().grantTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE);
+ assertTrue(tpc.proxy().hasTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE));
+
+ tpc.proxy().revokeTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE);
+ assertFalse(tpc.proxy().hasTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE));
+
+ }
+
+ @Test
+ public void systemPermissions() throws TException {
+ tpc.proxy().grantSystemPermission(userpass, testuser, SystemPermission.ALTER_USER);
+ assertTrue(tpc.proxy().hasSystemPermission(userpass, testuser, SystemPermission.ALTER_USER));
+
+ tpc.proxy().revokeSystemPermission(userpass, testuser, SystemPermission.ALTER_USER);
+ assertFalse(tpc.proxy().hasSystemPermission(userpass, testuser, SystemPermission.ALTER_USER));
+
+ }
+
+ @Test
+ public void auths() throws TException {
+ HashSet<ByteBuffer> newauths = new HashSet<>();
+ newauths.add(ByteBuffer.wrap("BBR".getBytes()));
+ newauths.add(ByteBuffer.wrap("Barney".getBytes()));
+ tpc.proxy().changeUserAuthorizations(userpass, testuser, newauths);
+ List<ByteBuffer> actualauths = tpc.proxy().getUserAuthorizations(userpass, testuser);
+ assertEquals(actualauths.size(), newauths.size());
+
+ for (ByteBuffer auth : actualauths) {
+ assertTrue(newauths.contains(auth));
+ }
+ }
+
+ @Test
+ public void namespacePermissions() throws TException {
+ tpc.proxy().grantNamespacePermission(userpass, testuser, testnamespace, NamespacePermission.ALTER_NAMESPACE);
+ assertTrue(tpc.proxy().hasNamespacePermission(userpass, testuser, testnamespace, NamespacePermission.ALTER_NAMESPACE));
+
+ tpc.proxy().revokeNamespacePermission(userpass, testuser, testnamespace, NamespacePermission.ALTER_NAMESPACE);
+ assertFalse(tpc.proxy().hasNamespacePermission(userpass, testuser, testnamespace, NamespacePermission.ALTER_NAMESPACE));
+ }
+
+ private Map<String,String> bb2pp(ByteBuffer cf) {
+ Map<String,String> toRet = new TreeMap<>();
+ toRet.put("password", ByteBufferUtil.toString(cf));
+ return toRet;
+ }
+
+}
diff --cc test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java
index 404bcbe,0000000..302ec14
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/TestProxyTableOperations.java
@@@ -1,202 -1,0 +1,201 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
- import com.google.common.net.HostAndPort;
-
+public class TestProxyTableOperations {
+
+ protected static TServer proxy;
+ protected static TestProxyClient tpc;
+ protected static ByteBuffer userpass;
+ protected static final int port = 10195;
+ protected static final String testtable = "testtable";
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ Properties prop = new Properties();
+ prop.setProperty("useMockInstance", "true");
+ prop.put("tokenClass", PasswordToken.class.getName());
+
+ proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server;
+ while (!proxy.isServing()) {
+ Thread.sleep(500);
+ }
+ tpc = new TestProxyClient("localhost", port);
+ userpass = tpc.proxy().login("root", Collections.singletonMap("password", ""));
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
+ proxy.stop();
+ }
+
+ @Before
+ public void makeTestTable() throws Exception {
+ tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS);
+ }
+
+ @After
+ public void deleteTestTable() throws Exception {
+ tpc.proxy().deleteTable(userpass, testtable);
+ }
+
+ @Test
+ public void createExistsDelete() throws TException {
+ assertFalse(tpc.proxy().tableExists(userpass, "testtable2"));
+ tpc.proxy().createTable(userpass, "testtable2", true, TimeType.MILLIS);
+ assertTrue(tpc.proxy().tableExists(userpass, "testtable2"));
+ tpc.proxy().deleteTable(userpass, "testtable2");
+ assertFalse(tpc.proxy().tableExists(userpass, "testtable2"));
+ }
+
+ @Test
+ public void listRename() throws TException {
+ assertFalse(tpc.proxy().tableExists(userpass, "testtable2"));
+ tpc.proxy().renameTable(userpass, testtable, "testtable2");
+ assertTrue(tpc.proxy().tableExists(userpass, "testtable2"));
+ tpc.proxy().renameTable(userpass, "testtable2", testtable);
+ assertTrue(tpc.proxy().listTables(userpass).contains("testtable"));
+
+ }
+
+ // This test does not yet function because the backing Mock instance does not yet support merging
+ @Test
+ public void merge() throws TException {
+ Set<ByteBuffer> splits = new HashSet<>();
+ splits.add(ByteBuffer.wrap("a".getBytes()));
+ splits.add(ByteBuffer.wrap("c".getBytes()));
+ splits.add(ByteBuffer.wrap("z".getBytes()));
+ tpc.proxy().addSplits(userpass, testtable, splits);
+
+ tpc.proxy().mergeTablets(userpass, testtable, ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap("d".getBytes()));
+
+ splits.remove(ByteBuffer.wrap("c".getBytes()));
+
+ List<ByteBuffer> tableSplits = tpc.proxy().listSplits(userpass, testtable, 10);
+
+ for (ByteBuffer split : tableSplits)
+ assertTrue(splits.contains(split));
+ assertTrue(tableSplits.size() == splits.size());
+
+ }
+
+ @Test
+ public void splits() throws TException {
+ Set<ByteBuffer> splits = new HashSet<>();
+ splits.add(ByteBuffer.wrap("a".getBytes()));
+ splits.add(ByteBuffer.wrap("b".getBytes()));
+ splits.add(ByteBuffer.wrap("z".getBytes()));
+ tpc.proxy().addSplits(userpass, testtable, splits);
+
+ List<ByteBuffer> tableSplits = tpc.proxy().listSplits(userpass, testtable, 10);
+
+ for (ByteBuffer split : tableSplits)
+ assertTrue(splits.contains(split));
+ assertTrue(tableSplits.size() == splits.size());
+ }
+
+ @Test
+ public void constraints() throws TException {
+ int cid = tpc.proxy().addConstraint(userpass, testtable, "org.apache.accumulo.TestConstraint");
+ Map<String,Integer> constraints = tpc.proxy().listConstraints(userpass, testtable);
+ assertEquals((int) constraints.get("org.apache.accumulo.TestConstraint"), cid);
+ tpc.proxy().removeConstraint(userpass, testtable, cid);
+ constraints = tpc.proxy().listConstraints(userpass, testtable);
+ assertNull(constraints.get("org.apache.accumulo.TestConstraint"));
+ }
+
+ @Test
+ public void localityGroups() throws TException {
+ Map<String,Set<String>> groups = new HashMap<>();
+ Set<String> group1 = new HashSet<>();
+ group1.add("cf1");
+ groups.put("group1", group1);
+ Set<String> group2 = new HashSet<>();
+ group2.add("cf2");
+ group2.add("cf3");
+ groups.put("group2", group2);
+ tpc.proxy().setLocalityGroups(userpass, testtable, groups);
+
+ Map<String,Set<String>> actualGroups = tpc.proxy().getLocalityGroups(userpass, testtable);
+
+ assertEquals(groups.size(), actualGroups.size());
+ for (String groupName : groups.keySet()) {
+ assertTrue(actualGroups.containsKey(groupName));
+ assertEquals(groups.get(groupName).size(), actualGroups.get(groupName).size());
+ for (String cf : groups.get(groupName)) {
+ assertTrue(actualGroups.get(groupName).contains(cf));
+ }
+ }
+ }
+
+ @Test
+ public void tableProperties() throws TException {
+ tpc.proxy().setTableProperty(userpass, testtable, "test.property1", "wharrrgarbl");
+ assertEquals(tpc.proxy().getTableProperties(userpass, testtable).get("test.property1"), "wharrrgarbl");
+ tpc.proxy().removeTableProperty(userpass, testtable, "test.property1");
+ assertNull(tpc.proxy().getTableProperties(userpass, testtable).get("test.property1"));
+ }
+
+ private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String value) {
+ ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes()));
+ update.setValue(value.getBytes());
+ mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update));
+ }
+
+ @Test
+ public void tableOperationsRowMethods() throws TException {
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<>();
+ for (int i = 0; i < 10; i++) {
+ addMutation(mutations, "" + i, "cf", "cq", "");
+ }
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+
+ assertEquals(tpc.proxy().getMaxRow(userpass, testtable, null, null, true, null, true), ByteBuffer.wrap("9".getBytes()));
+
+ tpc.proxy().deleteRows(userpass, testtable, ByteBuffer.wrap("51".getBytes()), ByteBuffer.wrap("99".getBytes()));
+ assertEquals(tpc.proxy().getMaxRow(userpass, testtable, null, null, true, null, true), ByteBuffer.wrap("5".getBytes()));
+ }
+
+}
diff --cc test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index fccb238,0000000..4dd3ee0
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@@ -1,415 -1,0 +1,414 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.ClientExecReturn;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.trace.Tracer;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
- import com.google.common.net.HostAndPort;
-
+/**
+ * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed when a TServer may still continue to use it. Checking that no tablet references a
+ * WAL is insufficient to determine if a WAL will never be used in the future.
+ */
+public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacBase {
+ private static final Logger log = LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServersIT.class);
+
+ private final int GC_PERIOD_SECONDS = 1;
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+ cfg.setNumTservers(1);
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s");
+ // Wait longer to try to let the replication table come online before a cycle runs
+ cfg.setProperty(Property.GC_CYCLE_START, "10s");
+ cfg.setProperty(Property.REPLICATION_NAME, "master");
+ // Set really long delays for the master to do stuff for replication. We don't need
+ // it to be doing anything, so just let it sleep
+ cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s");
+ cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "240s");
+ cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s");
+ // Pull down the maximum size of the wal so we can test close()'ing it.
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+ coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ /**
+ * Fetch all of the WALs referenced by tablets in the metadata table for this table
+ */
+ private Set<String> getWalsForTable(String tableName) throws Exception {
+ final Connector conn = getConnector();
+ final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+ Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
+
+ Instance i = conn.getInstance();
+ ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), "");
+ WalStateManager wals = new WalStateManager(conn.getInstance(), zk);
+
+ Set<String> result = new HashSet<>();
+ for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+ log.debug("Reading WALs: {}={}", entry.getKey(), entry.getValue());
+ result.add(entry.getKey().toString());
+ }
+ return result;
+ }
+
+ /**
+ * Fetch all of the rfiles referenced by tablets in the metadata table for this table
+ */
+ private Set<String> getFilesForTable(String tableName) throws Exception {
+ final Connector conn = getConnector();
+ final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+ Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
+
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ Range r = MetadataSchema.TabletsSection.getRange(tableId);
+ s.setRange(r);
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+ Set<String> rfiles = new HashSet<>();
+ for (Entry<Key,Value> entry : s) {
+ log.debug("Reading RFiles: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+ // uri://path/to/wal
+ String cq = entry.getKey().getColumnQualifier().toString();
+ String path = new Path(cq).toString();
+ log.debug("Normalize path to rfile: {}", path);
+ rfiles.add(path);
+ }
+
+ return rfiles;
+ }
+
+ /**
+ * Get the replication status messages for the given table that exist in the metadata table (~repl entries)
+ */
+ private Map<String,Status> getMetadataStatusForTable(String tableName) throws Exception {
+ final Connector conn = getConnector();
+ final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+ Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
+
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ Range r = MetadataSchema.ReplicationSection.getRange();
+ s.setRange(r);
+ s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId));
+
+ Map<String,Status> fileToStatus = new HashMap<>();
+ for (Entry<Key,Value> entry : s) {
+ Text file = new Text();
+ MetadataSchema.ReplicationSection.getFile(entry.getKey(), file);
+ Status status = Status.parseFrom(entry.getValue().get());
+ log.info("Got status for {}: {}", file, ProtobufUtil.toString(status));
+ fileToStatus.put(file.toString(), status);
+ }
+
+ return fileToStatus;
+ }
+
+ @Test
+ public void testActiveWalPrecludesClosing() throws Exception {
+ final String table = getUniqueNames(1)[0];
+ final Connector conn = getConnector();
+
+ // Bring the replication table online first and foremost
+ ReplicationTable.setOnline(conn);
+
+ log.info("Creating {}", table);
+ conn.tableOperations().create(table);
+
+ conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
+
+ log.info("Writing a few mutations to the table");
+
+ BatchWriter bw = conn.createBatchWriter(table, null);
+
+ byte[] empty = new byte[0];
+ for (int i = 0; i < 5; i++) {
+ Mutation m = new Mutation(Integer.toString(i));
+ m.put(empty, empty, empty);
+ bw.addMutation(m);
+ }
+
+ log.info("Flushing mutations to the server");
+ bw.flush();
+
+ log.info("Checking that metadata only has two WALs recorded for this table (inUse, and opened)");
+
+ Set<String> wals = getWalsForTable(table);
+ Assert.assertEquals("Expected to only find two WALs for the table", 2, wals.size());
+
+ // Flush our test table to remove the WAL references in it
+ conn.tableOperations().flush(table, null, null, true);
+ // Flush the metadata table too because it will have a reference to the WAL
+ conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+
+ log.info("Waiting for replication table to come online");
+
+ log.info("Fetching replication statuses from metadata table");
+
+ Map<String,Status> fileToStatus = getMetadataStatusForTable(table);
+
+ Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size());
+
+ String walName = fileToStatus.keySet().iterator().next();
+ wals.retainAll(fileToStatus.keySet());
+ Assert.assertEquals(1, wals.size());
+
+ Status status = fileToStatus.get(walName);
+
+ Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
+
+ Set<String> filesForTable = getFilesForTable(table);
+ Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
+ log.info("Files for table before MajC: {}", filesForTable);
+
+ // Issue a MajC to roll a new file in HDFS
+ conn.tableOperations().compact(table, null, null, false, true);
+
+ Set<String> filesForTableAfterCompaction = getFilesForTable(table);
+
+ log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
+
+ Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size());
+ Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable);
+
+ // Use the rfile which was just replaced by the MajC to determine when the GC has ran
+ Path fileToBeDeleted = new Path(filesForTable.iterator().next());
+ FileSystem fs = getCluster().getFileSystem();
+
+ boolean fileExists = fs.exists(fileToBeDeleted);
+ while (fileExists) {
+ log.info("File which should get deleted still exists: {}", fileToBeDeleted);
+ Thread.sleep(2000);
+ fileExists = fs.exists(fileToBeDeleted);
+ }
+
+ Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
+ Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
+
+ Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc);
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void testUnreferencedWalInTserverIsClosed() throws Exception {
+ final String[] names = getUniqueNames(2);
+ // `table` will be replicated, `otherTable` is only used to roll the WAL on the tserver
+ final String table = names[0], otherTable = names[1];
+ final Connector conn = getConnector();
+
+ // Bring the replication table online first and foremost
+ ReplicationTable.setOnline(conn);
+
+ log.info("Creating {}", table);
+ conn.tableOperations().create(table);
+
+ conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
+
+ log.info("Writing a few mutations to the table");
+
+ BatchWriter bw = conn.createBatchWriter(table, null);
+
+ byte[] empty = new byte[0];
+ for (int i = 0; i < 5; i++) {
+ Mutation m = new Mutation(Integer.toString(i));
+ m.put(empty, empty, empty);
+ bw.addMutation(m);
+ }
+
+ log.info("Flushing mutations to the server");
+ bw.close();
+
+ log.info("Checking that metadata only has one WAL recorded for this table");
+
+ Set<String> wals = getWalsForTable(table);
+ Assert.assertEquals("Expected to only find two WAL for the table", 2, wals.size());
+
+ log.info("Compacting the table which will remove all WALs from the tablets");
+
+ // Flush our test table to remove the WAL references in it
+ conn.tableOperations().flush(table, null, null, true);
+ // Flush the metadata table too because it will have a reference to the WAL
+ conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+
+ log.info("Fetching replication statuses from metadata table");
+
+ Map<String,Status> fileToStatus = getMetadataStatusForTable(table);
+
+ Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size());
+
+ String walName = fileToStatus.keySet().iterator().next();
+ Assert.assertTrue("Expected log file name from tablet to equal replication entry", wals.contains(walName));
+
+ Status status = fileToStatus.get(walName);
+
+ Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
+
+ Set<String> filesForTable = getFilesForTable(table);
+ Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
+ log.info("Files for table before MajC: {}", filesForTable);
+
+ // Issue a MajC to roll a new file in HDFS
+ conn.tableOperations().compact(table, null, null, false, true);
+
+ Set<String> filesForTableAfterCompaction = getFilesForTable(table);
+
+ log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
+
+ Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size());
+ Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable);
+
+ // Use the rfile which was just replaced by the MajC to determine when the GC has ran
+ Path fileToBeDeleted = new Path(filesForTable.iterator().next());
+ FileSystem fs = getCluster().getFileSystem();
+
+ boolean fileExists = fs.exists(fileToBeDeleted);
+ while (fileExists) {
+ log.info("File which should get deleted still exists: {}", fileToBeDeleted);
+ Thread.sleep(2000);
+ fileExists = fs.exists(fileToBeDeleted);
+ }
+
+ // At this point in time, we *know* that the GarbageCollector has run which means that the Status
+ // for our WAL should not be altered.
+
+ Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
+ Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
+
+ /*
+ * To verify that the WALs is still getting closed, we have to force the tserver to close the existing WAL and open a new one instead. The easiest way to do
+ * this is to write a load of data that will exceed the 1.33% full threshold that the logger keeps track of
+ */
+
+ conn.tableOperations().create(otherTable);
+ bw = conn.createBatchWriter(otherTable, null);
+ // 500k
+ byte[] bigValue = new byte[1024 * 500];
+ Arrays.fill(bigValue, (byte) 1);
+ // 500k * 50
+ for (int i = 0; i < 50; i++) {
+ Mutation m = new Mutation(Integer.toString(i));
+ m.put(empty, empty, bigValue);
+ bw.addMutation(m);
+ if (i % 10 == 0) {
+ bw.flush();
+ }
+ }
+
+ bw.close();
+
+ conn.tableOperations().flush(otherTable, null, null, true);
+
+ // Get the tservers which the master deems as active
+ final ClientContext context = new ClientContext(conn.getInstance(), new Credentials("root", new PasswordToken(ConfigurableMacBase.ROOT_PASSWORD)),
+ getClientConfig());
+ List<String> tservers = MasterClient.execute(context, new ClientExecReturn<List<String>,MasterClientService.Client>() {
+ @Override
+ public List<String> execute(MasterClientService.Client client) throws Exception {
+ return client.getActiveTservers(Tracer.traceInfo(), context.rpcCreds());
+ }
+ });
+
+ Assert.assertEquals("Expected only one active tservers", 1, tservers.size());
+
+ HostAndPort tserver = HostAndPort.fromString(tservers.get(0));
+
+ // Get the active WALs from that server
+ log.info("Fetching active WALs from {}", tserver);
+
+ Client client = ThriftUtil.getTServerClient(tserver, context);
+ List<String> activeWalsForTserver = client.getActiveLogs(Tracer.traceInfo(), context.rpcCreds());
+
+ log.info("Active wals: {}", activeWalsForTserver);
+
+ Assert.assertEquals("Expected to find only one active WAL", 1, activeWalsForTserver.size());
+
+ String activeWal = new Path(activeWalsForTserver.get(0)).toString();
+
+ Assert.assertNotEquals("Current active WAL on tserver should not be the original WAL we saw", walName, activeWal);
+
+ log.info("Ensuring that replication status does get closed after WAL is no longer in use by Tserver");
+
+ do {
+ Map<String,Status> replicationStatuses = getMetadataStatusForTable(table);
+
+ log.info("Got replication status messages {}", replicationStatuses);
+ Assert.assertEquals("Did not expect to find additional status records", 1, replicationStatuses.size());
+
+ status = replicationStatuses.values().iterator().next();
+ log.info("Current status: {}", ProtobufUtil.toString(status));
+
+ if (status.getClosed()) {
+ return;
+ }
+
+ log.info("Status is not yet closed, waiting for garbage collector to close it");
+
+ Thread.sleep(2000);
+ } while (true);
+ }
+}
diff --cc test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
index 72cb569,0000000..c9d4126
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
@@@ -1,115 -1,0 +1,115 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
- import com.google.common.net.HostAndPort;
+
+/**
+ *
+ */
+public class MultiTserverReplicationIT extends ConfigurableMacBase {
+ private static final Logger log = LoggerFactory.getLogger(MultiTserverReplicationIT.class);
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(2);
+ }
+
+ @Test
+ public void tserverReplicationServicePortsAreAdvertised() throws Exception {
+ // Wait for the cluster to be up
+ Connector conn = getConnector();
+ Instance inst = conn.getInstance();
+
+ // Wait for a tserver to come up to fulfill this request
+ conn.tableOperations().create("foo");
+ Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
+ Assert.assertEquals(0, Iterables.size(s));
+
+ ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+ Set<String> tserverHost = new HashSet<>();
+ tserverHost.addAll(zreader.getChildren(ZooUtil.getRoot(inst) + Constants.ZTSERVERS));
+
+ Set<HostAndPort> replicationServices = new HashSet<>();
+
+ for (String tserver : tserverHost) {
+ try {
+ byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_TSERVERS + "/" + tserver, null);
+ HostAndPort replAddress = HostAndPort.fromString(new String(portData, UTF_8));
+ replicationServices.add(replAddress);
+ } catch (Exception e) {
+ log.error("Could not find port for {}", tserver, e);
+ Assert.fail("Did not find replication port advertisement for " + tserver);
+ }
+ }
+
+ // Each tserver should also have equial replicaiton services running internally
+ Assert.assertEquals("Expected an equal number of replication servicers and tservers", tserverHost.size(), replicationServices.size());
+ }
+
+ @Test
+ public void masterReplicationServicePortsAreAdvertised() throws Exception {
+ // Wait for the cluster to be up
+ Connector conn = getConnector();
+ Instance inst = conn.getInstance();
+
+ // Wait for a tserver to come up to fulfill this request
+ conn.tableOperations().create("foo");
+ Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
+ Assert.assertEquals(0, Iterables.size(s));
+
+ ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+
+ // Should have one master instance
+ Assert.assertEquals(1, inst.getMasterLocations().size());
+
+ // Get the master thrift service addr
+ String masterAddr = Iterables.getOnlyElement(inst.getMasterLocations());
+
+ // Get the master replication coordinator addr
+ String replCoordAddr = new String(zreader.getData(ZooUtil.getRoot(inst) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, null), UTF_8);
+
+ // They shouldn't be the same
+ Assert.assertNotEquals(masterAddr, replCoordAddr);
+
+ // Neither should be zero as the port
+ Assert.assertNotEquals(0, HostAndPort.fromString(masterAddr).getPort());
+ Assert.assertNotEquals(0, HostAndPort.fromString(replCoordAddr).getPort());
+ }
+}
--
To stop receiving notification emails like this one, please contact
"commits@accumulo.apache.org" <co...@accumulo.apache.org>.