You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/06/04 20:53:10 UTC
[29/43] accumulo git commit: ACCUMULO-3871 move ITs into distro jar,
stop building test jar
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ScanRangeIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanRangeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanRangeIT.java
new file mode 100644
index 0000000..bd7555e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanRangeIT.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Map.Entry;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class ScanRangeIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ private static final int TS_LIMIT = 1;
+ private static final int CQ_LIMIT = 5;
+ private static final int CF_LIMIT = 5;
+ private static final int ROW_LIMIT = 100;
+
+ @Test
+ public void run() throws Exception {
+ Connector c = getConnector();
+ String[] tableNames = getUniqueNames(2);
+ String table1 = tableNames[0];
+ c.tableOperations().create(table1);
+ String table2 = tableNames[1];
+ c.tableOperations().create(table2);
+ TreeSet<Text> splitRows = new TreeSet<Text>();
+ int splits = 3;
+ for (int i = (ROW_LIMIT / splits); i < ROW_LIMIT; i += (ROW_LIMIT / splits))
+ splitRows.add(createRow(i));
+ c.tableOperations().addSplits(table2, splitRows);
+
+ insertData(c, table1);
+ scanTable(c, table1);
+
+ insertData(c, table2);
+ scanTable(c, table2);
+ }
+
+ private void scanTable(Connector c, String table) throws Exception {
+ scanRange(c, table, new IntKey(0, 0, 0, 0), new IntKey(1, 0, 0, 0));
+
+ scanRange(c, table, new IntKey(0, 0, 0, 0), new IntKey(ROW_LIMIT - 1, CF_LIMIT - 1, CQ_LIMIT - 1, 0));
+
+ scanRange(c, table, null, null);
+
+ for (int i = 0; i < ROW_LIMIT; i += (ROW_LIMIT / 3)) {
+ for (int j = 0; j < CF_LIMIT; j += (CF_LIMIT / 2)) {
+ for (int k = 1; k < CQ_LIMIT; k += (CQ_LIMIT / 2)) {
+ scanRange(c, table, null, new IntKey(i, j, k, 0));
+ scanRange(c, table, new IntKey(0, 0, 0, 0), new IntKey(i, j, k, 0));
+
+ scanRange(c, table, new IntKey(i, j, k, 0), new IntKey(ROW_LIMIT - 1, CF_LIMIT - 1, CQ_LIMIT - 1, 0));
+
+ scanRange(c, table, new IntKey(i, j, k, 0), null);
+
+ }
+ }
+ }
+
+ for (int i = 0; i < ROW_LIMIT; i++) {
+ scanRange(c, table, new IntKey(i, 0, 0, 0), new IntKey(i, CF_LIMIT - 1, CQ_LIMIT - 1, 0));
+
+ if (i > 0 && i < ROW_LIMIT - 1) {
+ scanRange(c, table, new IntKey(i - 1, 0, 0, 0), new IntKey(i + 1, CF_LIMIT - 1, CQ_LIMIT - 1, 0));
+ }
+ }
+
+ }
+
+ private static class IntKey {
+ private int row;
+ private int cf;
+ private int cq;
+ private long ts;
+
+ IntKey(IntKey ik) {
+ this.row = ik.row;
+ this.cf = ik.cf;
+ this.cq = ik.cq;
+ this.ts = ik.ts;
+ }
+
+ IntKey(int row, int cf, int cq, long ts) {
+ this.row = row;
+ this.cf = cf;
+ this.cq = cq;
+ this.ts = ts;
+ }
+
+ Key createKey() {
+ Text trow = createRow(row);
+ Text tcf = createCF(cf);
+ Text tcq = createCQ(cq);
+
+ return new Key(trow, tcf, tcq, ts);
+ }
+
+ IntKey increment() {
+
+ IntKey ik = new IntKey(this);
+
+ ik.ts++;
+ if (ik.ts >= TS_LIMIT) {
+ ik.ts = 0;
+ ik.cq++;
+ if (ik.cq >= CQ_LIMIT) {
+ ik.cq = 0;
+ ik.cf++;
+ if (ik.cf >= CF_LIMIT) {
+ ik.cf = 0;
+ ik.row++;
+ }
+ }
+ }
+
+ return ik;
+ }
+
+ }
+
+ private void scanRange(Connector c, String table, IntKey ik1, IntKey ik2) throws Exception {
+ scanRange(c, table, ik1, false, ik2, false);
+ scanRange(c, table, ik1, false, ik2, true);
+ scanRange(c, table, ik1, true, ik2, false);
+ scanRange(c, table, ik1, true, ik2, true);
+ }
+
+ private void scanRange(Connector c, String table, IntKey ik1, boolean inclusive1, IntKey ik2, boolean inclusive2) throws Exception {
+ Scanner scanner = c.createScanner(table, Authorizations.EMPTY);
+
+ Key key1 = null;
+ Key key2 = null;
+
+ IntKey expectedIntKey;
+ IntKey expectedEndIntKey;
+
+ if (ik1 != null) {
+ key1 = ik1.createKey();
+ expectedIntKey = ik1;
+
+ if (!inclusive1) {
+ expectedIntKey = expectedIntKey.increment();
+ }
+ } else {
+ expectedIntKey = new IntKey(0, 0, 0, 0);
+ }
+
+ if (ik2 != null) {
+ key2 = ik2.createKey();
+ expectedEndIntKey = ik2;
+
+ if (inclusive2) {
+ expectedEndIntKey = expectedEndIntKey.increment();
+ }
+ } else {
+ expectedEndIntKey = new IntKey(ROW_LIMIT, 0, 0, 0);
+ }
+
+ Range range = new Range(key1, inclusive1, key2, inclusive2);
+
+ scanner.setRange(range);
+
+ for (Entry<Key,Value> entry : scanner) {
+
+ Key expectedKey = expectedIntKey.createKey();
+ if (!expectedKey.equals(entry.getKey())) {
+ throw new Exception(" " + expectedKey + " != " + entry.getKey());
+ }
+
+ expectedIntKey = expectedIntKey.increment();
+ }
+
+ if (!expectedIntKey.createKey().equals(expectedEndIntKey.createKey())) {
+ throw new Exception(" " + expectedIntKey.createKey() + " != " + expectedEndIntKey.createKey());
+ }
+ }
+
+ private static Text createCF(int cf) {
+ Text tcf = new Text(String.format("cf_%03d", cf));
+ return tcf;
+ }
+
+ private static Text createCQ(int cf) {
+ Text tcf = new Text(String.format("cq_%03d", cf));
+ return tcf;
+ }
+
+ private static Text createRow(int row) {
+ Text trow = new Text(String.format("r_%06d", row));
+ return trow;
+ }
+
+ private void insertData(Connector c, String table) throws Exception {
+
+ BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+
+ for (int i = 0; i < ROW_LIMIT; i++) {
+ Mutation m = new Mutation(createRow(i));
+
+ for (int j = 0; j < CF_LIMIT; j++) {
+ for (int k = 0; k < CQ_LIMIT; k++) {
+ for (int t = 0; t < TS_LIMIT; t++) {
+ m.put(createCF(j), createCQ(k), t, new Value(String.format("%06d_%03d_%03d_%03d", i, j, k, t).getBytes(UTF_8)));
+ }
+ }
+ }
+
+ bw.addMutation(m);
+ }
+
+ bw.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
new file mode 100644
index 0000000..0636056
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScanSessionTimeOutIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(ScanSessionTimeOutIT.class);
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ Map<String,String> siteConfig = cfg.getSiteConfig();
+ siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), "3");
+ cfg.setSiteConfig(siteConfig);
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ private String sessionIdle = null;
+
+ @Before
+ public void reduceSessionIdle() throws Exception {
+ InstanceOperations ops = getConnector().instanceOperations();
+ sessionIdle = ops.getSystemConfiguration().get(Property.TSERV_SESSION_MAXIDLE.getKey());
+ ops.setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), "3");
+ log.info("Waiting for existing session idle time to expire");
+ Thread.sleep(AccumuloConfiguration.getTimeInMillis(sessionIdle));
+ log.info("Finished waiting");
+ }
+
+ @After
+ public void resetSessionIdle() throws Exception {
+ if (null != sessionIdle) {
+ getConnector().instanceOperations().setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), sessionIdle);
+ }
+ }
+
+ @Test
+ public void run() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+ for (int i = 0; i < 100000; i++) {
+ Mutation m = new Mutation(new Text(String.format("%08d", i)));
+ for (int j = 0; j < 3; j++)
+ m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8)));
+
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ Scanner scanner = c.createScanner(tableName, new Authorizations());
+ scanner.setBatchSize(1000);
+
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+ verify(iter, 0, 200);
+
+ // sleep three times the session timeout
+ UtilWaitThread.sleep(9000);
+
+ verify(iter, 200, 100000);
+
+ }
+
+ private void verify(Iterator<Entry<Key,Value>> iter, int start, int stop) throws Exception {
+ for (int i = start; i < stop; i++) {
+
+ Text er = new Text(String.format("%08d", i));
+
+ for (int j = 0; j < 3; j++) {
+ Entry<Key,Value> entry = iter.next();
+
+ if (!entry.getKey().getRow().equals(er)) {
+ throw new Exception("row " + entry.getKey().getRow() + " != " + er);
+ }
+
+ if (!entry.getKey().getColumnFamily().equals(new Text("cf1"))) {
+ throw new Exception("cf " + entry.getKey().getColumnFamily() + " != cf1");
+ }
+
+ if (!entry.getKey().getColumnQualifier().equals(new Text("cq" + j))) {
+ throw new Exception("cq " + entry.getKey().getColumnQualifier() + " != cq" + j);
+ }
+
+ if (!entry.getValue().toString().equals("" + i + "_" + j)) {
+ throw new Exception("value " + entry.getValue() + " != " + i + "_" + j);
+ }
+
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
new file mode 100644
index 0000000..340a58e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ *
+ */
+public class ScannerIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void testScannerReadaheadConfiguration() throws Exception {
+ final String table = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(table);
+
+ BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+
+ Mutation m = new Mutation("a");
+ for (int i = 0; i < 10; i++) {
+ m.put(Integer.toString(i), "", "");
+ }
+
+ bw.addMutation(m);
+ bw.close();
+
+ Scanner s = c.createScanner(table, new Authorizations());
+
+ IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class);
+ // A batch size of one will end up calling seek() for each element with no calls to next()
+ SlowIterator.setSeekSleepTime(cfg, 100l);
+
+ s.addScanIterator(cfg);
+ // Never start readahead
+ s.setReadaheadThreshold(Long.MAX_VALUE);
+ s.setBatchSize(1);
+ s.setRange(new Range());
+
+ Stopwatch sw = new Stopwatch();
+ Iterator<Entry<Key,Value>> iterator = s.iterator();
+
+ sw.start();
+ while (iterator.hasNext()) {
+ sw.stop();
+
+ // While we "do work" in the client, we should be fetching the next result
+ UtilWaitThread.sleep(100l);
+ iterator.next();
+ sw.start();
+ }
+ sw.stop();
+
+ long millisWithWait = sw.elapsed(TimeUnit.MILLISECONDS);
+
+ s = c.createScanner(table, new Authorizations());
+ s.addScanIterator(cfg);
+ s.setRange(new Range());
+ s.setBatchSize(1);
+ s.setReadaheadThreshold(0l);
+
+ sw = new Stopwatch();
+ iterator = s.iterator();
+
+ sw.start();
+ while (iterator.hasNext()) {
+ sw.stop();
+
+ // While we "do work" in the client, we should be fetching the next result
+ UtilWaitThread.sleep(100l);
+ iterator.next();
+ sw.start();
+ }
+ sw.stop();
+
+ long millisWithNoWait = sw.elapsed(TimeUnit.MILLISECONDS);
+
+ // The "no-wait" time should be much less than the "wait-time"
+ Assert.assertTrue("Expected less time to be taken with immediate readahead (" + millisWithNoWait + ") than without immediate readahead (" + millisWithWait
+ + ")", millisWithNoWait < millisWithWait);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ServerSideErrorIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ServerSideErrorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ServerSideErrorIT.java
new file mode 100644
index 0000000..02b65f4
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ServerSideErrorIT.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Collections;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Combiner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class ServerSideErrorIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Test
+ public void run() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ IteratorSetting is = new IteratorSetting(5, "Bad Aggregator", BadCombiner.class);
+ Combiner.setColumns(is, Collections.singletonList(new IteratorSetting.Column("acf")));
+ c.tableOperations().attachIterator(tableName, is);
+
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+ Mutation m = new Mutation(new Text("r1"));
+ m.put(new Text("acf"), new Text("foo"), new Value(new byte[] {'1'}));
+
+ bw.addMutation(m);
+
+ bw.close();
+
+ // try to scan table
+ Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
+
+ boolean caught = false;
+ try {
+ for (Entry<Key,Value> entry : scanner) {
+ entry.getKey();
+ }
+ } catch (Exception e) {
+ caught = true;
+ }
+
+ if (!caught)
+ throw new Exception("Scan did not fail");
+
+ // try to batch scan the table
+ BatchScanner bs = c.createBatchScanner(tableName, Authorizations.EMPTY, 2);
+ bs.setRanges(Collections.singleton(new Range()));
+
+ caught = false;
+ try {
+ for (Entry<Key,Value> entry : bs) {
+ entry.getKey();
+ }
+ } catch (Exception e) {
+ caught = true;
+ } finally {
+ bs.close();
+ }
+
+ if (!caught)
+ throw new Exception("batch scan did not fail");
+
+ // remove the bad agg so accumulo can shutdown
+ TableOperations to = c.tableOperations();
+ for (Entry<String,String> e : to.getProperties(tableName)) {
+ to.removeProperty(tableName, e.getKey());
+ }
+
+ UtilWaitThread.sleep(500);
+
+ // should be able to scan now
+ scanner = c.createScanner(tableName, Authorizations.EMPTY);
+ for (Entry<Key,Value> entry : scanner) {
+ entry.getKey();
+ }
+
+ // set a non existant iterator, should cause scan to fail on server side
+ scanner.addScanIterator(new IteratorSetting(100, "bogus", "com.bogus.iterator"));
+
+ caught = false;
+ try {
+ for (Entry<Key,Value> entry : scanner) {
+ // should error
+ entry.getKey();
+ }
+ } catch (Exception e) {
+ caught = true;
+ }
+
+ if (!caught)
+ throw new Exception("Scan did not fail");
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
new file mode 100644
index 0000000..36bdd7a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class SessionDurabilityIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ }
+
+ @Test(timeout = 3 * 60 * 1000)
+ public void nondurableTableHasDurableWrites() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ // table default has no durability
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none");
+ // send durable writes
+ BatchWriterConfig cfg = new BatchWriterConfig();
+ cfg.setDurability(Durability.SYNC);
+ writeSome(tableName, 10, cfg);
+ assertEquals(10, count(tableName));
+ // verify writes servive restart
+ restartTServer();
+ assertEquals(10, count(tableName));
+ }
+
+ @Test(timeout = 3 * 60 * 1000)
+ public void durableTableLosesNonDurableWrites() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ // table default is durable writes
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync");
+ // write with no durability
+ BatchWriterConfig cfg = new BatchWriterConfig();
+ cfg.setDurability(Durability.NONE);
+ writeSome(tableName, 10, cfg);
+ // verify writes are lost on restart
+ restartTServer();
+ assertTrue(10 > count(tableName));
+ }
+
+ private int count(String tableName) throws Exception {
+ return Iterators.size(getConnector().createScanner(tableName, Authorizations.EMPTY).iterator());
+ }
+
+ private void writeSome(String tableName, int n, BatchWriterConfig cfg) throws Exception {
+ Connector c = getConnector();
+ BatchWriter bw = c.createBatchWriter(tableName, cfg);
+ for (int i = 0; i < n; i++) {
+ Mutation m = new Mutation(i + "");
+ m.put("", "", "");
+ bw.addMutation(m);
+ }
+ bw.close();
+ }
+
+ @Test(timeout = 3 * 60 * 1000)
+ public void testConditionDurability() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ // table default is durable writes
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync");
+ // write without durability
+ ConditionalWriterConfig cfg = new ConditionalWriterConfig();
+ cfg.setDurability(Durability.NONE);
+ conditionWriteSome(tableName, 10, cfg);
+ // everything in there?
+ assertEquals(10, count(tableName));
+ // restart the server and verify the updates are lost
+ restartTServer();
+ assertEquals(0, count(tableName));
+ }
+
+ @Test(timeout = 3 * 60 * 1000)
+ public void testConditionDurability2() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ // table default is durable writes
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none");
+ // write with durability
+ ConditionalWriterConfig cfg = new ConditionalWriterConfig();
+ cfg.setDurability(Durability.SYNC);
+ conditionWriteSome(tableName, 10, cfg);
+ // everything in there?
+ assertEquals(10, count(tableName));
+ // restart the server and verify the updates are still there
+ restartTServer();
+ assertEquals(10, count(tableName));
+ }
+
+ private void conditionWriteSome(String tableName, int n, ConditionalWriterConfig cfg) throws Exception {
+ Connector c = getConnector();
+ ConditionalWriter cw = c.createConditionalWriter(tableName, cfg);
+ for (int i = 0; i < n; i++) {
+ ConditionalMutation m = new ConditionalMutation((CharSequence) (i + ""), new Condition("", ""));
+ m.put("", "", "X");
+ assertEquals(Status.ACCEPTED, cw.write(m).getStatus());
+ }
+ }
+
+ private void restartTServer() throws Exception {
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.start();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
new file mode 100644
index 0000000..f27ee02
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestRandomDeletes;
+import org.apache.accumulo.test.VerifyIngest;
+import org.junit.Test;
+
+public class ShutdownIT extends ConfigurableMacBase {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Test
+ public void shutdownDuringIngest() throws Exception {
+ Process ingest = cluster.exec(TestIngest.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD,
+ "--createTable");
+ UtilWaitThread.sleep(100);
+ assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ ingest.destroy();
+ }
+
+ @Test
+ public void shutdownDuringQuery() throws Exception {
+ assertEquals(0,
+ cluster.exec(TestIngest.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD, "--createTable")
+ .waitFor());
+ Process verify = cluster.exec(VerifyIngest.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD);
+ UtilWaitThread.sleep(100);
+ assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ verify.destroy();
+ }
+
+ @Test
+ public void shutdownDuringDelete() throws Exception {
+ assertEquals(0,
+ cluster.exec(TestIngest.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD, "--createTable")
+ .waitFor());
+ Process deleter = cluster.exec(TestRandomDeletes.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD);
+ UtilWaitThread.sleep(100);
+ assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ deleter.destroy();
+ }
+
+ @Test
+ public void shutdownDuringDeleteTable() throws Exception {
+ final Connector c = getConnector();
+ for (int i = 0; i < 10; i++) {
+ c.tableOperations().create("table" + i);
+ }
+ final AtomicReference<Exception> ref = new AtomicReference<Exception>();
+ Thread async = new Thread() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < 10; i++)
+ c.tableOperations().delete("table" + i);
+ } catch (Exception ex) {
+ ref.set(ex);
+ }
+ }
+ };
+ async.start();
+ UtilWaitThread.sleep(100);
+ assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ if (ref.get() != null)
+ throw ref.get();
+ }
+
+ @Test
+ public void stopDuringStart() throws Exception {
+ assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ }
+
+ @Test
+ public void adminStop() throws Exception {
+ runAdminStopTest(getConnector(), cluster);
+ }
+
+ static void runAdminStopTest(Connector c, MiniAccumuloClusterImpl cluster) throws InterruptedException, IOException {
+ assertEquals(0,
+ cluster.exec(TestIngest.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD, "--createTable")
+ .waitFor());
+ List<String> tabletServers = c.instanceOperations().getTabletServers();
+ assertEquals(2, tabletServers.size());
+ String doomed = tabletServers.get(0);
+ assertEquals(0, cluster.exec(Admin.class, "stop", doomed).waitFor());
+ tabletServers = c.instanceOperations().getTabletServers();
+ assertEquals(1, tabletServers.size());
+ assertFalse(tabletServers.get(0).equals(doomed));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
new file mode 100644
index 0000000..3fcbcfb
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class SimpleBalancerFairnessIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.TSERV_MAXMEM, "10K");
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "0");
+ cfg.setMemory(ServerType.TABLET_SERVER, cfg.getMemory(ServerType.TABLET_SERVER) * 3, MemoryUnit.BYTE);
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 10 * 60;
+ }
+
+ @Test
+ public void simpleBalancerFairness() throws Exception {
+ Connector c = getConnector();
+ c.tableOperations().create("test_ingest");
+ c.tableOperations().setProperty("test_ingest", Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+ c.tableOperations().create("unused");
+ TreeSet<Text> splits = TestIngest.getSplitPoints(0, 10000000, 500);
+ log.info("Creating " + splits.size() + " splits");
+ c.tableOperations().addSplits("unused", splits);
+ List<String> tservers = c.instanceOperations().getTabletServers();
+ TestIngest.Opts opts = new TestIngest.Opts();
+ opts.rows = 50000;
+ opts.setPrincipal("root");
+ TestIngest.ingest(c, opts, new BatchWriterOpts());
+ c.tableOperations().flush("test_ingest", null, null, false);
+ UtilWaitThread.sleep(45 * 1000);
+ Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD));
+ ClientContext context = new ClientContext(c.getInstance(), creds, getClientConfig());
+
+ MasterMonitorInfo stats = null;
+ int unassignedTablets = 1;
+ for (int i = 0; unassignedTablets > 0 && i < 10; i++) {
+ MasterClientService.Iface client = null;
+ try {
+ client = MasterClient.getConnectionWithRetry(context);
+ stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(c.getInstance()));
+ } finally {
+ if (client != null)
+ MasterClient.close(client);
+ }
+ unassignedTablets = stats.getUnassignedTablets();
+ if (unassignedTablets > 0) {
+ log.info("Found " + unassignedTablets + " unassigned tablets, sleeping 3 seconds for tablet assignment");
+ Thread.sleep(3000);
+ }
+ }
+
+ assertEquals("Unassigned tablets were not assigned within 30 seconds", 0, unassignedTablets);
+
+ // Compute online tablets per tserver
+ List<Integer> counts = new ArrayList<Integer>();
+ for (TabletServerStatus server : stats.tServerInfo) {
+ int count = 0;
+ for (TableInfo table : server.tableMap.values()) {
+ count += table.onlineTablets;
+ }
+ counts.add(count);
+ }
+ assertTrue("Expected to have at least two TabletServers", counts.size() > 1);
+ for (int i = 1; i < counts.size(); i++) {
+ int diff = Math.abs(counts.get(0) - counts.get(i));
+ assertTrue("Expected difference in tablets to be less than or equal to " + counts.size() + " but was " + diff + ". Counts " + counts,
+ diff <= tservers.size());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/SparseColumnFamilyIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SparseColumnFamilyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SparseColumnFamilyIT.java
new file mode 100644
index 0000000..8cece0b
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SparseColumnFamilyIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+/**
+ * This test recreates issue ACCUMULO-516. Until that issue is fixed this test should time out.
+ */
+public class SparseColumnFamilyIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void sparceColumnFamily() throws Exception {
+ String scftt = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(scftt);
+
+ BatchWriter bw = c.createBatchWriter(scftt, new BatchWriterConfig());
+
+ // create file in the tablet that has mostly column family 0, with a few entries for column family 1
+
+ bw.addMutation(nm(0, 1, 0));
+ for (int i = 1; i < 99999; i++) {
+ bw.addMutation(nm(i * 2, 0, i));
+ }
+ bw.addMutation(nm(99999 * 2, 1, 99999));
+ bw.flush();
+
+ c.tableOperations().flush(scftt, null, null, true);
+
+ // create a file that has column family 1 and 0 interleaved
+ for (int i = 0; i < 100000; i++) {
+ bw.addMutation(nm(i * 2 + 1, i % 2 == 0 ? 0 : 1, i));
+ }
+ bw.close();
+
+ c.tableOperations().flush(scftt, null, null, true);
+
+ Scanner scanner = c.createScanner(scftt, Authorizations.EMPTY);
+
+ for (int i = 0; i < 200; i++) {
+
+ // every time we search for column family 1, it will scan the entire file
+ // that has mostly column family 0 until the bug is fixed
+ scanner.setRange(new Range(String.format("%06d", i), null));
+ scanner.clearColumns();
+ scanner.setBatchSize(3);
+ scanner.fetchColumnFamily(new Text(String.format("%03d", 1)));
+
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ if (iter.hasNext()) {
+ Entry<Key,Value> entry = iter.next();
+ if (!"001".equals(entry.getKey().getColumnFamilyData().toString())) {
+ throw new Exception();
+ }
+ }
+ }
+ }
+
+ private Mutation nm(int row, int cf, int val) {
+ Mutation m = new Mutation(String.format("%06d", row));
+ m.put(String.format("%03d", cf), "", "" + val);
+ return m;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
new file mode 100644
index 0000000..49cd2aa
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.cluster.ClusterUser;
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.util.CheckForMetadataProblems;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+public class SplitIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(SplitIT.class);
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.TSERV_MAXMEM, "5K");
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "100ms");
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ private String tservMaxMem, tservMajcDelay;
+
+ @Before
+ public void alterConfig() throws Exception {
+ Assume.assumeTrue(ClusterType.MINI == getClusterType());
+
+ InstanceOperations iops = getConnector().instanceOperations();
+ Map<String,String> config = iops.getSystemConfiguration();
+ tservMaxMem = config.get(Property.TSERV_MAXMEM.getKey());
+ tservMajcDelay = config.get(Property.TSERV_MAJC_DELAY.getKey());
+
+ if (!tservMajcDelay.equals("100ms")) {
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "100ms");
+ }
+
+ // Property.TSERV_MAXMEM can't be altered on a running server
+ boolean restarted = false;
+ if (!tservMaxMem.equals("5K")) {
+ iops.setProperty(Property.TSERV_MAXMEM.getKey(), "5K");
+ getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ restarted = true;
+ }
+
+ // If we restarted the tservers, we don't need to re-wait for the majc delay
+ if (!restarted) {
+ long millis = AccumuloConfiguration.getTimeInMillis(tservMajcDelay);
+ log.info("Waiting for majc delay period: {}ms", millis);
+ Thread.sleep(millis);
+ log.info("Finished waiting for majc delay period");
+ }
+ }
+
+ @After
+ public void resetConfig() throws Exception {
+ if (null != tservMaxMem) {
+ log.info("Resetting {}={}", Property.TSERV_MAXMEM.getKey(), tservMaxMem);
+ getConnector().instanceOperations().setProperty(Property.TSERV_MAXMEM.getKey(), tservMaxMem);
+ tservMaxMem = null;
+ getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ if (null != tservMajcDelay) {
+ log.info("Resetting {}={}", Property.TSERV_MAJC_DELAY.getKey(), tservMajcDelay);
+ getConnector().instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), tservMajcDelay);
+ tservMajcDelay = null;
+ }
+ }
+
+ @Test
+ public void tabletShouldSplit() throws Exception {
+ Connector c = getConnector();
+ String table = getUniqueNames(1)[0];
+ c.tableOperations().create(table);
+ c.tableOperations().setProperty(table, Property.TABLE_SPLIT_THRESHOLD.getKey(), "256K");
+ c.tableOperations().setProperty(table, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K");
+ TestIngest.Opts opts = new TestIngest.Opts();
+ VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+ opts.rows = 100000;
+ opts.setTableName(table);
+
+ ClientConfiguration clientConfig = cluster.getClientConfig();
+ if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ opts.updateKerberosCredentials(clientConfig);
+ vopts.updateKerberosCredentials(clientConfig);
+ } else {
+ opts.setPrincipal(getAdminPrincipal());
+ vopts.setPrincipal(getAdminPrincipal());
+ }
+
+ TestIngest.ingest(c, opts, new BatchWriterOpts());
+ vopts.rows = opts.rows;
+ vopts.setTableName(table);
+ VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+ while (c.tableOperations().listSplits(table).size() < 10) {
+ UtilWaitThread.sleep(15 * 1000);
+ }
+ String id = c.tableOperations().tableIdMap().get(table);
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ KeyExtent extent = new KeyExtent(new Text(id), null, null);
+ s.setRange(extent.toMetadataRange());
+ MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(s);
+ int count = 0;
+ int shortened = 0;
+ for (Entry<Key,Value> entry : s) {
+ extent = new KeyExtent(entry.getKey().getRow(), entry.getValue());
+ if (extent.getEndRow() != null && extent.getEndRow().toString().length() < 14)
+ shortened++;
+ count++;
+ }
+
+ assertTrue("Shortened should be greater than zero: " + shortened, shortened > 0);
+ assertTrue("Count should be cgreater than 10: " + count, count > 10);
+
+ String[] args;
+ if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ ClusterUser rootUser = getAdminUser();
+ args = new String[] {"-i", cluster.getInstanceName(), "-u", rootUser.getPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-z",
+ cluster.getZooKeepers()};
+ } else {
+ PasswordToken token = (PasswordToken) getAdminToken();
+ args = new String[] {"-i", cluster.getInstanceName(), "-u", "root", "-p", new String(token.getPassword(), Charsets.UTF_8), "-z", cluster.getZooKeepers()};
+ }
+
+ assertEquals(0, getCluster().getClusterControl().exec(CheckForMetadataProblems.class, args));
+ }
+
+ @Test
+ public void interleaveSplit() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+ c.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none");
+ UtilWaitThread.sleep(5 * 1000);
+ ReadWriteIT.interleaveTest(c, tableName);
+ UtilWaitThread.sleep(5 * 1000);
+ int numSplits = c.tableOperations().listSplits(tableName).size();
+ while (numSplits <= 20) {
+ log.info("Waiting for splits to happen");
+ Thread.sleep(2000);
+ numSplits = c.tableOperations().listSplits(tableName).size();
+ }
+ assertTrue("Expected at least 20 splits, saw " + numSplits, numSplits > 20);
+ }
+
+ @Test
+ public void deleteSplit() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+ ClientConfiguration clientConfig = getCluster().getClientConfig();
+ String password = null, keytab = null;
+ if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ keytab = getAdminUser().getKeytab().getAbsolutePath();
+ } else {
+ password = new String(((PasswordToken) getAdminToken()).getPassword(), Charsets.UTF_8);
+ }
+ DeleteIT.deleteTest(c, getCluster(), getAdminPrincipal(), password, tableName, keytab);
+ c.tableOperations().flush(tableName, null, null, true);
+ for (int i = 0; i < 5; i++) {
+ UtilWaitThread.sleep(10 * 1000);
+ if (c.tableOperations().listSplits(tableName).size() > 20)
+ break;
+ }
+ assertTrue(c.tableOperations().listSplits(tableName).size() > 20);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
new file mode 100644
index 0000000..4d13e2a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
+import org.apache.accumulo.core.client.impl.Writer;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.tablets.TabletTime;
+import org.apache.accumulo.server.util.FileUtil;
+import org.apache.accumulo.server.util.MasterMetadataUtil;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Multimap;
+
+public class SplitRecoveryIT extends ConfigurableMacBase {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ private KeyExtent nke(String table, String endRow, String prevEndRow) {
+ return new KeyExtent(new Text(table), endRow == null ? null : new Text(endRow), prevEndRow == null ? null : new Text(prevEndRow));
+ }
+
+ private void run() throws Exception {
+ Instance inst = HdfsZooInstance.getInstance();
+ AccumuloServerContext c = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+ String zPath = ZooUtil.getRoot(inst) + "/testLock";
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.OVERWRITE);
+ ZooLock zl = new ZooLock(zPath);
+ boolean gotLock = zl.tryLock(new LockWatcher() {
+
+ @Override
+ public void lostLock(LockLossReason reason) {
+ System.exit(-1);
+
+ }
+
+ @Override
+ public void unableToMonitorLockNode(Throwable e) {
+ System.exit(-1);
+ }
+ }, "foo".getBytes(UTF_8));
+
+ if (!gotLock) {
+ System.err.println("Failed to get lock " + zPath);
+ }
+
+ // run test for a table with one tablet
+ runSplitRecoveryTest(c, 0, "sp", 0, zl, nke("foo0", null, null));
+ runSplitRecoveryTest(c, 1, "sp", 0, zl, nke("foo1", null, null));
+
+ // run test for tables with two tablets, run test on first and last tablet
+ runSplitRecoveryTest(c, 0, "k", 0, zl, nke("foo2", "m", null), nke("foo2", null, "m"));
+ runSplitRecoveryTest(c, 1, "k", 0, zl, nke("foo3", "m", null), nke("foo3", null, "m"));
+ runSplitRecoveryTest(c, 0, "o", 1, zl, nke("foo4", "m", null), nke("foo4", null, "m"));
+ runSplitRecoveryTest(c, 1, "o", 1, zl, nke("foo5", "m", null), nke("foo5", null, "m"));
+
+ // run test for table w/ three tablets, run test on middle tablet
+ runSplitRecoveryTest(c, 0, "o", 1, zl, nke("foo6", "m", null), nke("foo6", "r", "m"), nke("foo6", null, "r"));
+ runSplitRecoveryTest(c, 1, "o", 1, zl, nke("foo7", "m", null), nke("foo7", "r", "m"), nke("foo7", null, "r"));
+
+ // run test for table w/ three tablets, run test on first
+ runSplitRecoveryTest(c, 0, "g", 0, zl, nke("foo8", "m", null), nke("foo8", "r", "m"), nke("foo8", null, "r"));
+ runSplitRecoveryTest(c, 1, "g", 0, zl, nke("foo9", "m", null), nke("foo9", "r", "m"), nke("foo9", null, "r"));
+
+ // run test for table w/ three tablets, run test on last tablet
+ runSplitRecoveryTest(c, 0, "w", 2, zl, nke("fooa", "m", null), nke("fooa", "r", "m"), nke("fooa", null, "r"));
+ runSplitRecoveryTest(c, 1, "w", 2, zl, nke("foob", "m", null), nke("foob", "r", "m"), nke("foob", null, "r"));
+ }
+
+ private void runSplitRecoveryTest(AccumuloServerContext context, int failPoint, String mr, int extentToSplit, ZooLock zl, KeyExtent... extents)
+ throws Exception {
+
+ Text midRow = new Text(mr);
+
+ SortedMap<FileRef,DataFileValue> splitMapFiles = null;
+
+ for (int i = 0; i < extents.length; i++) {
+ KeyExtent extent = extents[i];
+
+ String tdir = ServerConstants.getTablesDirs()[0] + "/" + extent.getTableId().toString() + "/dir_" + i;
+ MetadataTableUtil.addTablet(extent, tdir, context, TabletTime.LOGICAL_TIME_ID, zl);
+ SortedMap<FileRef,DataFileValue> mapFiles = new TreeMap<FileRef,DataFileValue>();
+ mapFiles.put(new FileRef(tdir + "/" + RFile.EXTENSION + "_000_000"), new DataFileValue(1000017 + i, 10000 + i));
+
+ if (i == extentToSplit) {
+ splitMapFiles = mapFiles;
+ }
+ int tid = 0;
+ TransactionWatcher.ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
+ MetadataTableUtil.updateTabletDataFile(tid, extent, mapFiles, "L0", context, zl);
+ }
+
+ KeyExtent extent = extents[extentToSplit];
+
+ KeyExtent high = new KeyExtent(extent.getTableId(), extent.getEndRow(), midRow);
+ KeyExtent low = new KeyExtent(extent.getTableId(), midRow, extent.getPrevEndRow());
+
+ splitPartiallyAndRecover(context, extent, high, low, .4, splitMapFiles, midRow, "localhost:1234", failPoint, zl);
+ }
+
+ private void splitPartiallyAndRecover(AccumuloServerContext context, KeyExtent extent, KeyExtent high, KeyExtent low, double splitRatio,
+ SortedMap<FileRef,DataFileValue> mapFiles, Text midRow, String location, int steps, ZooLock zl) throws Exception {
+
+ SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>();
+
+ MetadataTableUtil.splitDatafiles(extent.getTableId(), midRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), mapFiles, lowDatafileSizes,
+ highDatafileSizes, highDatafilesToRemove);
+
+ MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio, context, zl);
+ TServerInstance instance = new TServerInstance(location, zl.getSessionId());
+ Writer writer = MetadataTableUtil.getMetadataTable(context);
+ Assignment assignment = new Assignment(high, instance);
+ Mutation m = new Mutation(assignment.tablet.getMetadataEntry());
+ assignment.server.putFutureLocation(m);
+ writer.update(m);
+
+ if (steps >= 1) {
+ Multimap<Long,FileRef> bulkFiles = MetadataTableUtil.getBulkFilesLoaded(context, extent);
+ MasterMetadataUtil.addNewTablet(context, low, "/lowDir", instance, lowDatafileSizes, bulkFiles, TabletTime.LOGICAL_TIME_ID + "0", -1l, -1l, zl);
+ }
+ if (steps >= 2) {
+ MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove, context, zl);
+ }
+
+ TabletServer.verifyTabletInformation(context, high, instance, null, "127.0.0.1:0", zl);
+
+ if (steps >= 1) {
+ ensureTabletHasNoUnexpectedMetadataEntries(context, low, lowDatafileSizes);
+ ensureTabletHasNoUnexpectedMetadataEntries(context, high, highDatafileSizes);
+
+ Multimap<Long,FileRef> lowBulkFiles = MetadataTableUtil.getBulkFilesLoaded(context, low);
+ Multimap<Long,FileRef> highBulkFiles = MetadataTableUtil.getBulkFilesLoaded(context, high);
+
+ if (!lowBulkFiles.equals(highBulkFiles)) {
+ throw new Exception(" " + lowBulkFiles + " != " + highBulkFiles + " " + low + " " + high);
+ }
+
+ if (lowBulkFiles.size() == 0) {
+ throw new Exception(" no bulk files " + low);
+ }
+ } else {
+ ensureTabletHasNoUnexpectedMetadataEntries(context, extent, mapFiles);
+ }
+ }
+
+ private void ensureTabletHasNoUnexpectedMetadataEntries(AccumuloServerContext context, KeyExtent extent, SortedMap<FileRef,DataFileValue> expectedMapFiles)
+ throws Exception {
+ Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
+ scanner.setRange(extent.toMetadataRange());
+
+ HashSet<ColumnFQ> expectedColumns = new HashSet<ColumnFQ>();
+ expectedColumns.add(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN);
+ expectedColumns.add(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN);
+ expectedColumns.add(TabletsSection.ServerColumnFamily.TIME_COLUMN);
+ expectedColumns.add(TabletsSection.ServerColumnFamily.LOCK_COLUMN);
+
+ HashSet<Text> expectedColumnFamilies = new HashSet<Text>();
+ expectedColumnFamilies.add(DataFileColumnFamily.NAME);
+ expectedColumnFamilies.add(TabletsSection.FutureLocationColumnFamily.NAME);
+ expectedColumnFamilies.add(TabletsSection.CurrentLocationColumnFamily.NAME);
+ expectedColumnFamilies.add(TabletsSection.LastLocationColumnFamily.NAME);
+ expectedColumnFamilies.add(TabletsSection.BulkFileColumnFamily.NAME);
+
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ while (iter.hasNext()) {
+ Key key = iter.next().getKey();
+
+ if (!key.getRow().equals(extent.getMetadataEntry())) {
+ throw new Exception("Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key);
+ }
+
+ if (expectedColumnFamilies.contains(key.getColumnFamily())) {
+ continue;
+ }
+
+ if (expectedColumns.remove(new ColumnFQ(key))) {
+ continue;
+ }
+
+ throw new Exception("Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key);
+ }
+ System.out.println("expectedColumns " + expectedColumns);
+ if (expectedColumns.size() > 1 || (expectedColumns.size() == 1)) {
+ throw new Exception("Not all expected columns seen " + extent + " " + expectedColumns);
+ }
+
+ SortedMap<FileRef,DataFileValue> fixedMapFiles = MetadataTableUtil.getDataFileSizes(extent, context);
+ verifySame(expectedMapFiles, fixedMapFiles);
+ }
+
+ private void verifySame(SortedMap<FileRef,DataFileValue> datafileSizes, SortedMap<FileRef,DataFileValue> fixedDatafileSizes) throws Exception {
+
+ if (!datafileSizes.keySet().containsAll(fixedDatafileSizes.keySet()) || !fixedDatafileSizes.keySet().containsAll(datafileSizes.keySet())) {
+ throw new Exception("Key sets not the same " + datafileSizes.keySet() + " != " + fixedDatafileSizes.keySet());
+ }
+
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+ DataFileValue dfv = entry.getValue();
+ DataFileValue otherDfv = fixedDatafileSizes.get(entry.getKey());
+
+ if (!dfv.equals(otherDfv)) {
+ throw new Exception(entry.getKey() + " dfv not equal " + dfv + " " + otherDfv);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new SplitRecoveryIT().run();
+ }
+
+ @Test
+ public void test() throws Exception {
+ assertEquals(0, exec(SplitRecoveryIT.class).waitFor());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/SslIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SslIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SslIT.java
new file mode 100644
index 0000000..13248d0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SslIT.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * Do a selection of ITs with SSL turned on that cover a range of different connection scenarios. Note that you can run *all* the ITs against SSL-enabled mini
+ * clusters with `mvn verify -DuseSslForIT`
+ *
+ */
+public class SslIT extends ConfigurableMacBase {
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 6 * 60;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ super.configure(cfg, hadoopCoreSite);
+ configureForSsl(cfg, getSslDir(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName())));
+ }
+
+ @Test
+ public void binary() throws AccumuloException, AccumuloSecurityException, Exception {
+ String tableName = getUniqueNames(1)[0];
+ getConnector().tableOperations().create(tableName);
+ BinaryIT.runTest(getConnector(), tableName);
+ }
+
+ @Test
+ public void concurrency() throws Exception {
+ ConcurrencyIT.runTest(getConnector(), getUniqueNames(1)[0]);
+ }
+
+ @Test
+ public void adminStop() throws Exception {
+ ShutdownIT.runAdminStopTest(getConnector(), getCluster());
+ }
+
+ @Test
+ public void bulk() throws Exception {
+ BulkIT.runTest(getConnector(), FileSystem.getLocal(new Configuration(false)), new Path(getCluster().getConfig().getDir().getAbsolutePath(), "tmp"), "root",
+ getUniqueNames(1)[0], this.getClass().getName(), testName.getMethodName());
+ }
+
+ @Test
+ public void mapReduce() throws Exception {
+ MapReduceIT.runTest(getConnector(), getCluster());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/SslWithClientAuthIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SslWithClientAuthIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SslWithClientAuthIT.java
new file mode 100644
index 0000000..bb00b19
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SslWithClientAuthIT.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+/**
+ * Run all the same tests as SslIT, but with client auth turned on.
+ *
+ * All the methods are overridden just to make it easier to run individual tests from an IDE.
+ *
+ */
+public class SslWithClientAuthIT extends SslIT {
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ super.configure(cfg, hadoopCoreSite);
+ Map<String,String> site = cfg.getSiteConfig();
+ site.put(Property.INSTANCE_RPC_SSL_CLIENT_AUTH.getKey(), "true");
+ cfg.setSiteConfig(site);
+ }
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 8 * 60;
+ }
+
+ @Override
+ @Test
+ public void binary() throws AccumuloException, AccumuloSecurityException, Exception {
+ super.binary();
+ }
+
+ @Override
+ @Test
+ public void concurrency() throws Exception {
+ super.concurrency();
+ }
+
+ @Override
+ @Test
+ public void adminStop() throws Exception {
+ super.adminStop();
+ }
+
+ @Override
+ @Test
+ public void bulk() throws Exception {
+ super.bulk();
+ }
+
+ @Override
+ @Test
+ public void mapReduce() throws Exception {
+ super.mapReduce();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/StartIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/StartIT.java b/test/src/main/java/org/apache/accumulo/test/functional/StartIT.java
new file mode 100644
index 0000000..57a8a6f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/StartIT.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.apache.accumulo.cluster.ClusterControl;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.start.TestMain;
+import org.junit.Test;
+
+public class StartIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 30;
+ }
+
+ @Test
+ public void test() throws Exception {
+ ClusterControl control = getCluster().getClusterControl();
+
+ assertNotEquals(0, control.exec(TestMain.class, new String[] {"exception"}));
+ assertEquals(0, control.exec(TestMain.class, new String[] {"success"}));
+ assertNotEquals(0, control.exec(TestMain.class, new String[0]));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
new file mode 100644
index 0000000..a4678a7
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
@@ -0,0 +1,108 @@
+package org.apache.accumulo.test.functional;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+
+import org.apache.accumulo.cluster.AccumuloCluster;
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assume;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class TableIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Assume.assumeThat(getClusterType(), CoreMatchers.is(ClusterType.MINI));
+
+ AccumuloCluster cluster = getCluster();
+ MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) cluster;
+ String rootPath = mac.getConfig().getDir().getAbsolutePath();
+
+ Connector c = getConnector();
+ TableOperations to = c.tableOperations();
+ String tableName = getUniqueNames(1)[0];
+ to.create(tableName);
+
+ TestIngest.Opts opts = new TestIngest.Opts();
+ VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+ ClientConfiguration clientConfig = getCluster().getClientConfig();
+ if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ opts.updateKerberosCredentials(clientConfig);
+ vopts.updateKerberosCredentials(clientConfig);
+ } else {
+ opts.setPrincipal(getAdminPrincipal());
+ vopts.setPrincipal(getAdminPrincipal());
+ }
+
+ opts.setTableName(tableName);
+ TestIngest.ingest(c, opts, new BatchWriterOpts());
+ to.flush(tableName, null, null, true);
+ vopts.setTableName(tableName);
+ VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+ String id = to.tableIdMap().get(tableName);
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(new KeyExtent(new Text(id), null, null).toMetadataRange());
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ assertTrue(Iterators.size(s.iterator()) > 0);
+
+ FileSystem fs = getCluster().getFileSystem();
+ assertTrue(fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id)).length > 0);
+ to.delete(tableName);
+ assertEquals(0, Iterators.size(s.iterator()));
+ try {
+ assertEquals(0, fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id)).length);
+ } catch (FileNotFoundException ex) {
+ // that's fine, too
+ }
+ assertNull(to.tableIdMap().get(tableName));
+ to.create(tableName);
+ TestIngest.ingest(c, opts, new BatchWriterOpts());
+ VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+ to.delete(tableName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/TabletIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletIT.java
new file mode 100644
index 0000000..d2b1416
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class TabletIT extends AccumuloClusterHarness {
+
+ private static final int N = 1000;
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ Map<String,String> siteConfig = cfg.getSiteConfig();
+ siteConfig.put(Property.TSERV_MAXMEM.getKey(), "128M");
+ cfg.setDefaultMemory(256, MemoryUnit.MEGABYTE);
+ cfg.setSiteConfig(siteConfig);
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Test
+ public void createTableTest() throws Exception {
+ String tableName = getUniqueNames(1)[0];
+ createTableTest(tableName, false);
+ createTableTest(tableName, true);
+ }
+
+ public void createTableTest(String tableName, boolean readOnly) throws Exception {
+ // create the test table within accumulo
+ Connector connector = getConnector();
+
+ if (!readOnly) {
+ TreeSet<Text> keys = new TreeSet<Text>();
+ for (int i = N / 100; i < N; i += N / 100) {
+ keys.add(new Text(String.format("%05d", i)));
+ }
+
+ // presplit
+ connector.tableOperations().create(tableName);
+ connector.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "200");
+ connector.tableOperations().addSplits(tableName, keys);
+ BatchWriter b = connector.createBatchWriter(tableName, new BatchWriterConfig());
+
+ // populate
+ for (int i = 0; i < N; i++) {
+ Mutation m = new Mutation(new Text(String.format("%05d", i)));
+ m.put(new Text("col" + Integer.toString((i % 3) + 1)), new Text("qual"), new Value("junk".getBytes(UTF_8)));
+ b.addMutation(m);
+ }
+ b.close();
+ }
+
+ Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
+ int count = 0;
+ for (Entry<Key,Value> elt : scanner) {
+ String expected = String.format("%05d", count);
+ assert (elt.getKey().getRow().toString().equals(expected));
+ count++;
+ }
+ assertEquals(N, count);
+ }
+
+}