You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/12/02 21:37:15 UTC
[27/50] [abbrv] accumulo git commit: ACCUMULO-3167 Get mapreduce
working against real cluster. Port other tests to AccumuloClusterIT
ACCUMULO-3167 Get mapreduce working against real cluster. Port other tests to AccumuloClusterIT
Had to change ExamplesIT to account for ACCUMULO-3364.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ee1694ca
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ee1694ca
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ee1694ca
Branch: refs/heads/metrics2
Commit: ee1694caa4cd329a01a4bf591f20e30fb91e0658
Parents: b130b1d
Author: Josh Elser <el...@apache.org>
Authored: Mon Nov 24 13:23:44 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Nov 24 18:09:24 2014 -0500
----------------------------------------------------------------------
.../standalone/StandaloneClusterControl.java | 36 +++++++--
.../accumulo/harness/AccumuloClusterIT.java | 6 ++
.../accumulo/test/BulkImportVolumeIT.java | 37 ++++++---
.../test/functional/BinaryStressIT.java | 65 ++++++++++++---
.../accumulo/test/functional/CleanTmpIT.java | 55 ++++++++++---
.../accumulo/test/functional/CompactionIT.java | 85 +++++++++++++++-----
.../accumulo/test/functional/ExamplesIT.java | 33 ++++++--
7 files changed, 255 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
index 29cc1e8..378cb6b 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
@@ -20,6 +20,8 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import java.net.URL;
+import java.security.CodeSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -43,14 +45,14 @@ import com.google.common.collect.Maps;
public class StandaloneClusterControl implements ClusterControl {
private static final Logger log = LoggerFactory.getLogger(StandaloneClusterControl.class);
- private static final String START_SERVER_SCRIPT = "start-server.sh", ACCUMULO_SCRIPT = "accumulo";
+ private static final String START_SERVER_SCRIPT = "start-server.sh", ACCUMULO_SCRIPT = "accumulo", TOOL_SCRIPT = "tool.sh";
private static final String MASTER_HOSTS_FILE = "masters", GC_HOSTS_FILE = "gc", TSERVER_HOSTS_FILE = "slaves", TRACER_HOSTS_FILE = "tracers",
MONITOR_HOSTS_FILE = "monitor";
protected String accumuloHome, accumuloConfDir;
protected RemoteShellOptions options;
- protected String startServerPath, accumuloPath;
+ protected String startServerPath, accumuloPath, toolPath;
public StandaloneClusterControl() {
this(System.getenv("ACCUMULO_HOME"), System.getenv("ACCUMULO_CONF_DIR"));
@@ -62,10 +64,9 @@ public class StandaloneClusterControl implements ClusterControl {
this.accumuloConfDir = accumuloConfDir;
File bin = new File(accumuloHome, "bin");
- File startServer = new File(bin, "start-server.sh");
- this.startServerPath = startServer.getAbsolutePath();
- File accumulo = new File(bin, "accumulo");
- this.accumuloPath = accumulo.getAbsolutePath();
+ this.startServerPath = new File(bin, START_SERVER_SCRIPT).getAbsolutePath();
+ this.accumuloPath = new File(bin, ACCUMULO_SCRIPT).getAbsolutePath();
+ this.toolPath = new File(bin, TOOL_SCRIPT).getAbsolutePath();
}
protected Entry<Integer,String> exec(String hostname, String[] command) throws IOException {
@@ -101,6 +102,29 @@ public class StandaloneClusterControl implements ClusterControl {
return exec(master, cmd);
}
+ public Entry<Integer,String> execMapreduceWithStdout(Class<?> clz, String[] args) throws IOException {
+ File confDir = getConfDir();
+ String master = getHosts(new File(confDir, "masters")).get(0);
+ String[] cmd = new String[3 + args.length];
+ cmd[0] = toolPath;
+ CodeSource source = clz.getProtectionDomain().getCodeSource();
+ if (null == source) {
+ throw new RuntimeException("Could not get CodeSource for class");
+ }
+ URL jarUrl = source.getLocation();
+ String jar = jarUrl.getPath();
+ if (!jar.endsWith(".jar")) {
+ throw new RuntimeException("Need to have a jar to run mapreduce: " + jar);
+ }
+ cmd[1] = jar;
+ cmd[2] = clz.getName();
+ for (int i = 0, j = 3; i < args.length; i++, j++) {
+ cmd[j] = "'" + args[i] + "'";
+ }
+ log.info("Running: '{}' on {}", StringUtils.join(cmd, " "), master);
+ return exec(master, cmd);
+ }
+
@Override
public void adminStopAll() throws IOException {
File confDir = getConfDir();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
index 38b615f..6c496e9 100644
--- a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
+++ b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.harness.conf.AccumuloClusterPropertyConfiguration;
import org.apache.accumulo.harness.conf.StandaloneAccumuloClusterConfiguration;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assume;
@@ -166,6 +167,11 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste
return clusterConf.getToken();
}
+ public static FileSystem getFileSystem() throws IOException {
+ Preconditions.checkState(initialized);
+ return cluster.getFileSystem();
+ }
+
public Connector getConnector() {
try {
return cluster.getConnector(getPrincipal(), getToken());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java b/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
index 79a2513..78cdfe6 100644
--- a/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
@@ -23,15 +23,19 @@ import java.io.File;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterIT;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
// ACCUMULO-118/ACCUMULO-2504
-public class BulkImportVolumeIT extends ConfigurableMacIT {
+public class BulkImportVolumeIT extends AccumuloClusterIT {
+ private static final Logger log = LoggerFactory.getLogger(BulkImportVolumeIT.class);
File volDirBase = null;
Path v1, v2;
@@ -42,7 +46,7 @@ public class BulkImportVolumeIT extends ConfigurableMacIT {
}
@Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
File baseDir = cfg.getDir();
volDirBase = new File(baseDir, "volumes");
File v1f = new File(volDirBase, "v1");
@@ -64,15 +68,26 @@ public class BulkImportVolumeIT extends ConfigurableMacIT {
String tableName = getUniqueNames(1)[0];
TableOperations to = getConnector().tableOperations();
to.create(tableName);
- File bulk = new File(rootPath() + "/bulk");
- System.out.println("bulk: " + bulk);
- assertTrue(bulk.mkdirs());
- File err = new File(rootPath() + "/err");
- assertTrue(err.mkdirs());
- File bogus = new File(bulk + "/bogus.rf");
- assertTrue(bogus.createNewFile());
+ FileSystem fs = getFileSystem();
+ String rootPath = getUsableDir();
+ Path bulk = new Path(rootPath, "bulk");
+ log.info("bulk: {}", bulk);
+ if (fs.exists(bulk)) {
+ fs.delete(bulk, true);
+ }
+ assertTrue(fs.mkdirs(bulk));
+ Path err = new Path(rootPath, "err");
+ log.info("err: {}", err);
+ if (fs.exists(err)) {
+ fs.delete(err, true);
+ }
+ assertTrue(fs.mkdirs(err));
+ Path bogus = new Path(bulk, "bogus.rf");
+ fs.create(bogus).close();
+ log.info("bogus: {}", bogus);
+ assertTrue(fs.exists(bogus));
to.importDirectory(tableName, bulk.toString(), err.toString(), false);
- assertEquals(1, err.list().length);
+ assertEquals(1, fs.listStatus(err).length);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
index 2a086a4..29f1f57 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
@@ -19,19 +19,30 @@ package org.apache.accumulo.test.functional;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-public class BinaryStressIT extends ConfigurableMacIT {
+public class BinaryStressIT extends AccumuloClusterIT {
@Override
protected int defaultTimeoutSeconds() {
@@ -39,13 +50,45 @@ public class BinaryStressIT extends ConfigurableMacIT {
}
@Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
Map<String,String> siteConfig = new HashMap<String,String>();
siteConfig.put(Property.TSERV_MAXMEM.getKey(), "50K");
siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0");
cfg.setSiteConfig(siteConfig );
}
+ private String majcDelay, maxMem;
+
+ @Before
+ public void alterConfig() throws Exception {
+ if (ClusterType.MINI == getClusterType()) {
+ return;
+ }
+
+ InstanceOperations iops = getConnector().instanceOperations();
+ Map<String,String> conf = iops.getSystemConfiguration();
+ majcDelay = conf.get(Property.TSERV_MAJC_DELAY.getKey());
+ maxMem = conf.get(Property.TSERV_MAXMEM.getKey());
+
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "0");
+ iops.setProperty(Property.TSERV_MAXMEM.getKey(), "50K");
+
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+
+ @After
+ public void resetConfig() throws Exception {
+ if (null != majcDelay) {
+ InstanceOperations iops = getConnector().instanceOperations();
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+ iops.setProperty(Property.TSERV_MAXMEM.getKey(), maxMem);
+
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ }
+
@Test
public void binaryStressTest() throws Exception {
Connector c = getConnector();
@@ -54,9 +97,13 @@ public class BinaryStressIT extends ConfigurableMacIT {
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
BinaryIT.runTest(c, tableName);
String id = c.tableOperations().tableIdMap().get(tableName);
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
- FileStatus[] dir = fs.listStatus(new Path(cluster.getConfig().getDir() + "/accumulo/tables/" + id));
- assertTrue(dir.length > 7);
+ Set<Text> tablets = new HashSet<Text>();
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(Range.prefix(id));
+ for (Entry<Key,Value> entry : s) {
+ tablets.add(entry.getKey().getRow());
+ }
+ assertTrue("Expected at least 8 tablets, saw " + tablets.size(), tablets.size() > 7);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java b/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
index 676f6d7..1387983 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
@@ -16,30 +16,44 @@
*/
package org.apache.accumulo.test.functional;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterIT;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class CleanTmpIT extends ConfigurableMacIT {
+import com.google.common.collect.Iterables;
+
+public class CleanTmpIT extends AccumuloClusterIT {
+ private static final Logger log = LoggerFactory.getLogger(CleanTmpIT.class);
@Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
Map<String,String> props = new HashMap<String,String>();
props.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s");
cfg.setSiteConfig(props);
@@ -63,20 +77,39 @@ public class CleanTmpIT extends ConfigurableMacIT {
Mutation m = new Mutation("row");
m.put("cf", "cq", "value");
bw.addMutation(m);
+ bw.flush();
+
+ // Compact memory to make a file
+ c.tableOperations().compact(tableName, null, null, true, true);
+
+ // Make sure that we'll have a WAL
+ m = new Mutation("row2");
+ m.put("cf", "cq", "value");
+ bw.addMutation(m);
bw.close();
// create a fake _tmp file in its directory
String id = c.tableOperations().tableIdMap().get(tableName);
- FileSystem fs = getCluster().getFileSystem();
- Path tmp = new Path("/accumulo/tables/" + id + "/default_tablet/junk.rf_tmp");
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(Range.prefix(id));
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+ Path file = new Path(entry.getKey().getColumnQualifier().toString());
+
+ FileSystem fs = getFileSystem();
+ assertTrue("Could not find file: " + file, fs.exists(file));
+ Path tabletDir = file.getParent();
+ assertNotNull("Tablet dir should not be null", tabletDir);
+ Path tmp = new Path(tabletDir, "junk.rf_tmp");
+ // Make the file
fs.create(tmp).close();
- for (ProcessReference tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
- getCluster().killProcess(ServerType.TABLET_SERVER, tserver);
- }
- getCluster().start();
+ log.info("Created tmp file {}", tmp.toString());
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
- FunctionalTestUtils.count(scanner);
- assertFalse(fs.exists(tmp));
+ assertEquals(2, FunctionalTestUtils.count(scanner));
+ // If we performed log recovery, we should have cleaned up any stray files
+ assertFalse("File still exists: " + tmp, fs.exists(tmp));
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
index b659913..dc877dc 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -16,11 +16,9 @@
*/
package org.apache.accumulo.test.functional;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -30,27 +28,29 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.cli.ScannerOpts;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.junit.Rule;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class CompactionIT extends ConfigurableMacIT {
-
- @Rule
- public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+public class CompactionIT extends AccumuloClusterIT {
+ private static final Logger log = LoggerFactory.getLogger(CompactionIT.class);
@Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
Map<String,String> map = new HashMap<String,String>();
map.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4");
map.put(Property.TSERV_MAJC_DELAY.getKey(), "1");
@@ -63,14 +63,52 @@ public class CompactionIT extends ConfigurableMacIT {
return 4 * 60;
}
+ private String majcThreadMaxOpen, majcDelay, majcMaxConcurrent;
+
+ @Before
+ public void alterConfig() throws Exception {
+ if (ClusterType.STANDALONE == getClusterType()) {
+ InstanceOperations iops = getConnector().instanceOperations();
+ Map<String,String> config = iops.getSystemConfiguration();
+ majcThreadMaxOpen = config.get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey());
+ majcDelay = config.get(Property.TSERV_MAJC_DELAY.getKey());
+ majcMaxConcurrent = config.get(Property.TSERV_MAJC_MAXCONCURRENT.getKey());
+
+ iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4");
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1");
+ iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "1");
+
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ }
+
+ @After
+ public void resetConfig() throws Exception {
+ // We set the values..
+ if (null != majcThreadMaxOpen) {
+ InstanceOperations iops = getConnector().instanceOperations();
+
+ iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), majcThreadMaxOpen);
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+ iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), majcMaxConcurrent);
+
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ }
+
@Test
public void test() throws Exception {
final Connector c = getConnector();
- c.tableOperations().create("test_ingest");
- c.tableOperations().setProperty("test_ingest", Property.TABLE_MAJC_RATIO.getKey(), "1.0");
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
- FunctionalTestUtils.createRFiles(c, fs, folder.getRoot() + "/testrf", 500000, 59, 4);
- FunctionalTestUtils.bulkImport(c, fs, "test_ingest", folder.getRoot() + "/testrf");
+ final String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0");
+ FileSystem fs = getFileSystem();
+ String root = getUsableDir();
+ Path testrf = new Path(root, "testrf");
+ FunctionalTestUtils.createRFiles(c, fs, testrf.toString(), 500000, 59, 4);
+ FunctionalTestUtils.bulkImport(c, fs, tableName, testrf.toString());
int beforeCount = countFiles(c);
final AtomicBoolean fail = new AtomicBoolean(false);
@@ -89,8 +127,10 @@ public class CompactionIT extends ConfigurableMacIT {
opts.random = 56;
opts.dataSize = 50;
opts.cols = 1;
+ opts.tableName = tableName;
VerifyIngest.verifyIngest(c, opts, new ScannerOpts());
} catch (Exception ex) {
+ log.warn("Got exception verifying data", ex);
fail.set(true);
}
}
@@ -100,12 +140,21 @@ public class CompactionIT extends ConfigurableMacIT {
}
for (Thread t : threads)
t.join();
- assertFalse(fail.get());
+ assertFalse("Failed to successfully run all threads, Check the test output for error", fail.get());
}
int finalCount = countFiles(c);
assertTrue(finalCount < beforeCount);
- assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ try {
+ getClusterControl().adminStopAll();
+ } finally {
+ // Make sure the internal state in the cluster is reset (e.g. processes in MAC)
+ getCluster().stop();
+ if (ClusterType.STANDALONE == getClusterType()) {
+ // Then restart things for the next test if it's a standalone
+ getCluster().start();
+ }
+ }
}
private int countFiles(Connector c) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
index 047e69d..fbaf243 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.accumulo.cluster.standalone.StandaloneClusterControl;
import org.apache.accumulo.core.cli.BatchWriterOpts;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
@@ -86,6 +87,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.Tool;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
@@ -243,8 +245,11 @@ public class ExamplesIT extends AccumuloClusterIT {
c.tableOperations().attachIterator(table, is);
bw = c.createBatchWriter(table, bwc);
+ // Write two mutations otherwise the NativeMap would dedupe them into a single update
Mutation m = new Mutation("foo");
m.put("a", "b", "1");
+ bw.addMutation(m);
+ m = new Mutation("foo");
m.put("a", "b", "3");
bw.addMutation(m);
bw.flush();
@@ -349,12 +354,21 @@ public class ExamplesIT extends AccumuloClusterIT {
@Test
public void testTeraSortAndRead() throws Exception {
String tableName = getUniqueNames(1)[0];
- goodExec(TeraSortIngest.class, "--count", (1000 * 1000) + "", "-nk", "10", "-xk", "10", "-nv", "10", "-xv", "10", "-t", tableName, "-i", instance, "-z",
+ goodExec(TeraSortIngest.class, "--count", (1000 * 1000) + "", "-nk", "10", "-xk", "10", "-nv", "10", "-xv", "10", "-t", tableName, "-i", instance,
+ "-z",
keepers, "-u", user, "-p", passwd, "--splits", "4");
- goodExec(RegexExample.class, "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--rowRegex", ".*999.*", "--output", dir
- + "/tmp/nines");
+ Path output = new Path(dir, "tmp/nines");
+ if (fs.exists(output)) {
+ fs.delete(output, true);
+ }
+ goodExec(RegexExample.class, "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--rowRegex", ".*999.*", "--output",
+ output.toString());
goodExec(RowHash.class, "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--column", "c:");
- goodExec(TableToFile.class, "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--output", dir + "/tmp/tableFile");
+ output = new Path(dir, "tmp/tableFile");
+ if (fs.exists(output)) {
+ fs.delete(output, true);
+ }
+ goodExec(TableToFile.class, "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--output", output.toString());
}
@Test
@@ -422,9 +436,14 @@ public class ExamplesIT extends AccumuloClusterIT {
}
private void goodExec(Class<?> theClass, String... args) throws InterruptedException, IOException {
- // We're already slurping stdout into memory (not redirecting to file). Might as well add it to error message.
- Entry<Integer,String> pair = getClusterControl().execWithStdout(theClass, args);
+ Entry<Integer,String> pair;
+ if (Tool.class.isAssignableFrom(theClass) && ClusterType.STANDALONE == getClusterType()) {
+ StandaloneClusterControl control = (StandaloneClusterControl) getClusterControl();
+ pair = control.execMapreduceWithStdout(theClass, args);
+ } else {
+ // We're already slurping stdout into memory (not redirecting to file). Might as well add it to error message.
+ pair = getClusterControl().execWithStdout(theClass, args);
+ }
Assert.assertEquals("stdout=" + pair.getValue(), 0, pair.getKey().intValue());
}
-
}