You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/01/17 07:36:44 UTC
svn commit: r1559022 [2/2] - in /pig/trunk: ./ ivy/
src/org/apache/pig/backend/hadoop/accumulo/ test/
test/org/apache/pig/backend/hadoop/accumulo/
Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAbstractAccumuloStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAbstractAccumuloStorage.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAbstractAccumuloStorage.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAbstractAccumuloStorage.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAbstractAccumuloStorage {
+
+ public static void assertConfigurationsEqual(Configuration expectedConf,
+ Configuration actualConf) {
+ // Make sure the values in both confs are equal
+ Iterator<Entry<String, String>> expectedIter = expectedConf.iterator();
+ while (expectedIter.hasNext()) {
+ Entry<String, String> e = expectedIter.next();
+ assertEquals("Values differed for " + e.getKey(),
+ expectedConf.get(e.getKey()), actualConf.get(e.getKey()));
+ }
+
+ Iterator<Entry<String, String>> actualIter = actualConf.iterator();
+ while (actualIter.hasNext()) {
+ Entry<String, String> e = actualIter.next();
+ assertEquals("Values differed for " + e.getKey(),
+ expectedConf.get(e.getKey()), actualConf.get(e.getKey()));
+ }
+ }
+
+ public static void assertKeyValueEqualsTuple(Key key, Value value,
+ Tuple tuple) throws ExecException {
+ assertTrue(Arrays.equals(key.getRow().getBytes(),
+ ((DataByteArray) tuple.get(0)).get()));
+ assertTrue(Arrays.equals(key.getColumnFamily().getBytes(),
+ ((DataByteArray) tuple.get(1)).get()));
+ assertTrue(Arrays.equals(key.getColumnQualifier().getBytes(),
+ ((DataByteArray) tuple.get(2)).get()));
+ assertTrue(Arrays.equals(key.getColumnVisibility().getBytes(),
+ ((DataByteArray) tuple.get(3)).get()));
+ assertEquals(key.getTimestamp(), ((Long) tuple.get(4)).longValue());
+ assertTrue(Arrays.equals(value.get(),
+ ((DataByteArray) tuple.get(5)).get()));
+ }
+
+ public static void assertWholeRowKeyValueEqualsTuple(Key key, Value value,
+ Tuple mainTuple) throws IOException {
+ assertTrue(Arrays.equals(key.getRow().getBytes(),
+ ((DataByteArray) mainTuple.get(0)).get()));
+
+ DefaultDataBag bag = (DefaultDataBag) mainTuple.get(1);
+ Iterator<Tuple> iter = bag.iterator();
+
+ for (Entry<Key, Value> e : WholeRowIterator.decodeRow(key, value)
+ .entrySet()) {
+ Tuple tuple = iter.next();
+
+ assertTrue(Arrays.equals(e.getKey().getColumnFamily().getBytes(),
+ ((DataByteArray) tuple.get(0)).get()));
+ assertTrue(Arrays.equals(
+ e.getKey().getColumnQualifier().getBytes(),
+ ((DataByteArray) tuple.get(1)).get()));
+ assertTrue(Arrays.equals(e.getKey().getColumnVisibility()
+ .getBytes(), ((DataByteArray) tuple.get(2)).get()));
+ assertEquals(e.getKey().getTimestamp(),
+ ((Long) tuple.get(3)).longValue());
+ assertTrue(Arrays.equals(e.getValue().get(),
+ ((DataByteArray) tuple.get(4)).get()));
+ }
+ }
+
+ public Job getExpectedLoadJob(String inst, String zookeepers, String user,
+ String password, String table, String start, String end,
+ Authorizations authorizations,
+ List<Pair<Text, Text>> columnFamilyColumnQualifierPairs)
+ throws IOException {
+ Collection<Range> ranges = new LinkedList<Range>();
+ ranges.add(new Range(start, end));
+
+ Job expected = new Job(new Configuration());
+
+ try {
+ AccumuloInputFormat.setConnectorInfo(expected, user,
+ new PasswordToken(password));
+ } catch (AccumuloSecurityException e) {
+ Assert.fail(e.getMessage());
+ }
+ AccumuloInputFormat.setInputTableName(expected, table);
+ AccumuloInputFormat.setScanAuthorizations(expected, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(expected, inst, zookeepers);
+ AccumuloInputFormat.fetchColumns(expected,
+ columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.setRanges(expected, ranges);
+
+ return expected;
+ }
+
+ public Job getDefaultExpectedLoadJob() throws IOException {
+ String inst = "myinstance";
+ String zookeepers = "127.0.0.1:2181";
+ String user = "root";
+ String password = "secret";
+ String table = "table1";
+ String start = "abc";
+ String end = "z";
+ Authorizations authorizations = new Authorizations(
+ "PRIVATE,PUBLIC".split(","));
+
+ List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text, Text>>();
+ columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(
+ "col1"), new Text("cq1")));
+ columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(
+ "col2"), new Text("cq2")));
+ columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(
+ "col3"), null));
+
+ Job expected = getExpectedLoadJob(inst, zookeepers, user, password,
+ table, start, end, authorizations,
+ columnFamilyColumnQualifierPairs);
+ return expected;
+ }
+
+ public Job getExpectedStoreJob(String inst, String zookeepers, String user,
+ String password, String table, long maxWriteBufferSize,
+ int writeThreads, int maxWriteLatencyMS) throws IOException {
+
+ Job expected = new Job(new Configuration());
+
+ try {
+ AccumuloOutputFormat.setConnectorInfo(expected, user,
+ new PasswordToken(password));
+ } catch (AccumuloSecurityException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ AccumuloOutputFormat.setZooKeeperInstance(expected, inst, zookeepers);
+ AccumuloOutputFormat.setCreateTables(expected, true);
+
+ BatchWriterConfig bwConfig = new BatchWriterConfig();
+ bwConfig.setMaxLatency(maxWriteLatencyMS, TimeUnit.MILLISECONDS);
+ bwConfig.setMaxMemory(maxWriteBufferSize);
+ bwConfig.setMaxWriteThreads(writeThreads);
+
+ AccumuloOutputFormat.setBatchWriterOptions(expected, bwConfig);
+
+ return expected;
+ }
+
+ public Job getDefaultExpectedStoreJob() throws IOException {
+ String inst = "myinstance";
+ String zookeepers = "127.0.0.1:2181";
+ String user = "root";
+ String password = "secret";
+ String table = "table1";
+ long maxWriteBufferSize = 1234000;
+ int writeThreads = 7;
+ int maxWriteLatencyMS = 30000;
+
+ Job expected = getExpectedStoreJob(inst, zookeepers, user, password,
+ table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
+ return expected;
+ }
+
+ public String getDefaultLoadLocation() {
+ return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2,col3&start=abc&end=z";
+ }
+
+ public String getDefaultStoreLocation() {
+ return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&write_buffer_size_bytes=1234000&write_threads=7&write_latency_ms=30000";
+ }
+
+ public static AbstractAccumuloStorage getAbstractAccumuloStorage()
+ throws ParseException, IOException {
+ return getAbstractAccumuloStorage("");
+ }
+
+ public static AbstractAccumuloStorage getAbstractAccumuloStorage(
+ String columns) throws ParseException, IOException {
+ return getAbstractAccumuloStorage(columns, "");
+ }
+
+ public static AbstractAccumuloStorage getAbstractAccumuloStorage(
+ String columns, String args) throws ParseException, IOException {
+ return new AbstractAccumuloStorage(columns, args) {
+
+ @Override
+ public Collection<Mutation> getMutations(Tuple tuple) {
+ return null;
+ }
+
+ @Override
+ protected Tuple getTuple(Key key, Value value) throws IOException {
+ return null;
+ }
+ };
+ }
+
+ @Test
+ public void testSetLoadLocation() throws IOException, ParseException {
+ AbstractAccumuloStorage s = getAbstractAccumuloStorage();
+
+ Job actual = new Job();
+ s.setLocation(getDefaultLoadLocation(), actual);
+ Configuration actualConf = actual.getConfiguration();
+
+ Job expected = getDefaultExpectedLoadJob();
+ Configuration expectedConf = expected.getConfiguration();
+
+ s.loadDependentJars(expectedConf);
+
+ assertConfigurationsEqual(expectedConf, actualConf);
+ }
+
+ @Test
+ public void testSetStoreLocation() throws IOException, ParseException {
+ AbstractAccumuloStorage s = getAbstractAccumuloStorage();
+
+ Job actual = new Job();
+ s.setStoreLocation(getDefaultStoreLocation(), actual);
+ Configuration actualConf = actual.getConfiguration();
+
+ Job expected = getDefaultExpectedStoreJob();
+ Configuration expectedConf = expected.getConfiguration();
+
+ s.loadDependentJars(expectedConf);
+
+ assertConfigurationsEqual(expectedConf, actualConf);
+ }
+}
Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloBinaryConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloBinaryConverter.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloBinaryConverter.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloBinaryConverter.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAccumuloBinaryConverter {
+
+ AccumuloBinaryConverter converter;
+
+ @Before
+ public void setup() {
+ converter = new AccumuloBinaryConverter();
+ }
+
+ @Test
+ public void testInts() throws IOException {
+ byte[] b = converter.toBytes(Integer.MAX_VALUE);
+ Assert.assertEquals(Integer.MAX_VALUE, converter.bytesToInteger(b)
+ .intValue());
+ }
+
+ @Test
+ public void testDoubles() throws IOException {
+ byte[] b = converter.toBytes(Double.MAX_VALUE);
+ Assert.assertEquals(Double.MAX_VALUE, converter.bytesToDouble(b)
+ .doubleValue(), 0);
+ }
+
+ @Test
+ public void testLongs() throws IOException {
+ byte[] b = converter.toBytes(Long.MAX_VALUE);
+ Assert.assertEquals(Long.MAX_VALUE, converter.bytesToLong(b)
+ .longValue());
+ }
+
+ @Test
+ public void testFloats() throws IOException {
+ byte[] b = converter.toBytes(Float.MAX_VALUE);
+ Assert.assertEquals(Float.MAX_VALUE, converter.bytesToFloat(b)
+ .floatValue(), 0f);
+ }
+
+ @Test
+ public void testBoolean() throws IOException {
+ Assert.assertTrue(converter.bytesToBoolean(converter.toBytes(true)));
+ Assert.assertFalse(converter.bytesToBoolean(converter.toBytes(false)));
+ }
+
+}
Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloColumns.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloColumns.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloColumns.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloColumns.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import org.apache.pig.backend.hadoop.accumulo.Column.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAccumuloColumns {
+
+ @Test(expected = NullPointerException.class)
+ public void testNull() {
+ new Column(null);
+ }
+
+ @Test
+ public void testEmptyColumn() {
+ Column c = new Column("");
+
+ Assert.assertEquals(Type.LITERAL, c.getType());
+ Assert.assertEquals("", c.getColumnFamily());
+ Assert.assertEquals(null, c.getColumnQualifier());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBlankColfamQual() {
+ new Column(":");
+ }
+
+ @Test
+ public void testColfamWithColqualRegex() {
+ Column c = new Column("cf:*");
+
+ Assert.assertEquals(Type.COLQUAL_PREFIX, c.getType());
+ Assert.assertEquals("cf", c.getColumnFamily());
+ Assert.assertEquals("", c.getColumnQualifier());
+ }
+
+ @Test
+ public void testColfamRegexEmptyColqual() {
+ Column c = new Column("cf:");
+
+ Assert.assertEquals(Type.COLQUAL_PREFIX, c.getType());
+ Assert.assertEquals("cf", c.getColumnFamily());
+ Assert.assertEquals("", c.getColumnQualifier());
+ }
+
+ @Test
+ public void testColfamRegex() {
+ Column c = new Column("cf*:");
+
+ Assert.assertEquals(Type.COLFAM_PREFIX, c.getType());
+ Assert.assertEquals("cf", c.getColumnFamily());
+ Assert.assertEquals("", c.getColumnQualifier());
+ }
+
+ @Test
+ public void testColfamRegexColqualRegex() {
+ // TODO Change test when cf*:cq* is supported
+ Column c = new Column("cf*:cq*");
+
+ Assert.assertEquals(Type.COLFAM_PREFIX, c.getType());
+ Assert.assertEquals("cf", c.getColumnFamily());
+ Assert.assertEquals("cq*", c.getColumnQualifier());
+ }
+}
Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class TestAccumuloPigCluster {
+
+ @SuppressWarnings("unchecked")
+ private static final List<ImmutableMap<String, String>> AIRPORTS = Lists
+ .newArrayList(
+ ImmutableMap.of("code", "SJC", "name", "San Jose"),
+ ImmutableMap.of("code", "SFO", "name", "San Francisco"),
+ ImmutableMap.of("code", "MDO", "name", "Orlando"),
+ ImmutableMap.of("code", "MDW", "name", "Chicago-Midway"),
+ ImmutableMap.of("code", "JFK", "name", "JFK International"),
+ ImmutableMap.of("code", "BWI", "name",
+ "Baltimore-Washington"));
+
+ @SuppressWarnings("unchecked")
+ private static final List<ImmutableMap<String, String>> flightData = Lists
+ .newArrayList(
+ ImmutableMap.of("origin", "BWI", "destination", "SFO"),
+ ImmutableMap.of("origin", "BWI", "destination", "SJC"),
+ ImmutableMap.of("origin", "MDW", "destination", "MDO"),
+ ImmutableMap.of("origin", "MDO", "destination", "SJC"),
+ ImmutableMap.of("origin", "SJC", "destination", "JFK"),
+ ImmutableMap.of("origin", "JFK", "destination", "MDW"));
+
+ private static final Logger log = Logger
+ .getLogger(TestAccumuloPigCluster.class);
+ private static final File tmpdir = Files.createTempDir();
+ private static MiniAccumuloCluster accumuloCluster;
+ private static MiniCluster cluster;
+ private static Configuration conf;
+ private PigServer pig;
+
+ @BeforeClass
+ public static void setupClusters() throws Exception {
+ MiniAccumuloConfig macConf = new MiniAccumuloConfig(tmpdir, "password");
+ macConf.setNumTservers(1);
+
+ accumuloCluster = new MiniAccumuloCluster(macConf);
+ accumuloCluster.start();
+
+ // This is needed by Pig
+ cluster = MiniCluster.buildCluster();
+ conf = cluster.getConfiguration();
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ pig = new PigServer(ExecType.LOCAL, conf);
+ }
+
+ @AfterClass
+ public static void stopClusters() throws Exception {
+ accumuloCluster.stop();
+ FileUtils.deleteDirectory(tmpdir);
+ }
+
+ private void loadTestData() throws Exception {
+ ZooKeeperInstance inst = new ZooKeeperInstance(
+ accumuloCluster.getInstanceName(),
+ accumuloCluster.getZooKeepers());
+ Connector c = inst.getConnector("root", new PasswordToken("password"));
+
+ TableOperations tops = c.tableOperations();
+ if (!tops.exists("airports")) {
+ tops.create("airports");
+ }
+
+ if (!tops.exists("flights")) {
+ tops.create("flights");
+ }
+
+ BatchWriterConfig config = new BatchWriterConfig();
+ config.setMaxWriteThreads(1);
+ config.setMaxLatency(100000l, TimeUnit.MILLISECONDS);
+ config.setMaxMemory(10000l);
+
+ BatchWriter bw = c.createBatchWriter("airports", config);
+ try {
+ int i = 1;
+ for (Map<String, String> record : AIRPORTS) {
+ Mutation m = new Mutation(Integer.toString(i));
+
+ for (Entry<String, String> entry : record.entrySet()) {
+ m.put(entry.getKey(), "", entry.getValue());
+ }
+
+ bw.addMutation(m);
+ i++;
+ }
+ } finally {
+ if (null != bw) {
+ bw.close();
+ }
+ }
+
+ bw = c.createBatchWriter("flights", config);
+ try {
+ int i = 1;
+ for (Map<String, String> record : flightData) {
+ Mutation m = new Mutation(Integer.toString(i));
+
+ for (Entry<String, String> entry : record.entrySet()) {
+ m.put(entry.getKey(), "", entry.getValue());
+ }
+
+ bw.addMutation(m);
+ i++;
+ }
+ } finally {
+ if (null != bw) {
+ bw.close();
+ }
+ }
+ }
+
+ @Test
+ // (timeout = 60000)
+ public void test() throws Exception {
+ loadTestData();
+
+ final String loadFlights = "flights = LOAD 'accumulo://flights?instance="
+ + accumuloCluster.getInstanceName()
+ + "&user=root&password=password&zookeepers="
+ + accumuloCluster.getZooKeepers()
+ + "' using org.apache.pig.backend.hadoop.accumulo.AccumuloStorage()"
+ + " as (rowKey:chararray, column_map:map[]);";
+
+ final String loadAirports = "airports = LOAD 'accumulo://airports?instance="
+ + accumuloCluster.getInstanceName()
+ + "&user=root&password=password&zookeepers="
+ + accumuloCluster.getZooKeepers()
+ + "' using org.apache.pig.backend.hadoop.accumulo.AccumuloStorage()"
+ + " as (rowKey:chararray, column_map:map[]);";
+
+ final String joinQuery = "joined = JOIN flights BY column_map#'origin', airports BY column_map#'code';";
+
+ pig.registerQuery(loadFlights);
+ pig.registerQuery(loadAirports);
+ pig.registerQuery(joinQuery);
+
+ Iterator<Tuple> it = pig.openIterator("joined");
+
+ int i = 0;
+ while (it.hasNext()) {
+ Tuple t = it.next();
+
+ // id and map for each dataset we joined
+ Assert.assertEquals(4, t.size());
+
+ Object o = t.get(1);
+
+ Map<String, String> airport = null, flight = null;
+
+ Assert.assertTrue(Map.class.isAssignableFrom(o.getClass()));
+ @SuppressWarnings("unchecked")
+ Map<String, String> data1 = (Map<String, String>) o;
+ Assert.assertTrue(!data1.isEmpty());
+
+ if (data1.containsKey("origin")) {
+ flight = data1;
+ } else if (data1.containsKey("code")) {
+ airport = data1;
+ } else {
+ Assert.fail("Received map which did not contain an expected key");
+ }
+
+ o = t.get(3);
+
+ Assert.assertTrue(Map.class.isAssignableFrom(o.getClass()));
+ @SuppressWarnings("unchecked")
+ Map<String, String> data2 = (Map<String, String>) o;
+ Assert.assertTrue(!data2.isEmpty());
+
+ if (null == flight && data2.containsKey("origin")) {
+ flight = data2;
+ } else if (null == airport && data2.containsKey("code")) {
+ airport = data2;
+ } else {
+ Assert.fail("Received map which did not contain an expected key");
+ }
+
+ Assert.assertTrue(null != airport && null != flight);
+
+ Assert.assertEquals(airport.get("code"), flight.get("origin"));
+
+ i++;
+ }
+
+ Assert.assertEquals(6, i);
+ }
+
+}
Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorage.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorage.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorage.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,840 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.commons.cli.ParseException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalMap;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestAccumuloStorage {
+
+ @Test
+ public void testWrite1Tuple() throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, "row");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(0, mutations.size());
+ }
+
+ @Test(expected = ClassCastException.class)
+ public void testWriteLiteralAsMap() throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, "value");
+
+ storage.getMutations(t).size();
+ }
+
+ @Test(expected = ClassCastException.class)
+ public void testWriteLiteralAsMapWithAsterisk() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage("*");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, "value");
+
+ storage.getMutations(t).size();
+ }
+
+ @Test
+ public void testWrite2TupleWithColumn() throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage("col");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, "value");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal",
+ Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(1, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal",
+ Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+ Assert.assertTrue("CQ not equal",
+ Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
+ Assert.assertTrue("Values not equal",
+ Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+ }
+
+ @Test
+ public void testWrite2TupleWithColumnQual() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage("col:qual");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, "value");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal",
+ Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(1, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal",
+ Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(
+ colUpdate.getColumnQualifier(), "qual".getBytes()));
+ Assert.assertTrue("Values not equal",
+ Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+ }
+
+ @Test
+ public void testWrite2TupleWithMixedColumns() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage(
+ "col1,col1:qual,col2:qual,col2");
+
+ Tuple t = TupleFactory.getInstance().newTuple(5);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, "value2");
+ t.set(3, "value3");
+ t.set(4, "value4");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal",
+ Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(4, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal",
+ Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+ Assert.assertTrue("CQ not equal",
+ Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
+ Assert.assertTrue("Values not equal",
+ Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+
+ colUpdate = colUpdates.get(1);
+ Assert.assertTrue("CF not equal",
+ Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(
+ colUpdate.getColumnQualifier(), "qual".getBytes()));
+ Assert.assertTrue("Values not equal",
+ Arrays.equals(colUpdate.getValue(), "value2".getBytes()));
+
+ colUpdate = colUpdates.get(2);
+ Assert.assertTrue("CF not equal",
+ Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(
+ colUpdate.getColumnQualifier(), "qual".getBytes()));
+ Assert.assertTrue("Values not equal",
+ Arrays.equals(colUpdate.getValue(), "value3".getBytes()));
+
+ colUpdate = colUpdates.get(3);
+ Assert.assertTrue("CF not equal",
+ Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+ Assert.assertTrue("CQ not equal",
+ Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
+ Assert.assertTrue("Values not equal",
+ Arrays.equals(colUpdate.getValue(), "value4".getBytes()));
+ }
+
+ @Test
+ public void testWriteIgnoredExtraColumns() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage("col");
+
+ Tuple t = TupleFactory.getInstance().newTuple(3);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, "value2");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal",
+ Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(1, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal",
+ Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+ Assert.assertTrue("CQ not equal",
+ Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
+ Assert.assertTrue("Values not equal",
+ Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+ }
+
+ @Test
+ public void testWriteIgnoredExtraMap() throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage("col1");
+
+ Map<String, Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(3);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal",
+ Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(1, colUpdates.size());
+
+ ColumnUpdate update = colUpdates.get(0);
+ Assert.assertEquals("col1", new String(update.getColumnFamily()));
+ Assert.assertEquals("", new String(update.getColumnQualifier()));
+ Assert.assertEquals("value1", new String(update.getValue()));
+ }
+
+ @Test
+ public void testWriteMultipleColumnsWithNonExpandedMap()
+ throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage("col1,col2");
+
+ Map<String, Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(3);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal",
+ Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(2, colUpdates.size());
+
+ ColumnUpdate update = colUpdates.get(0);
+ Assert.assertEquals("col1", new String(update.getColumnFamily()));
+ Assert.assertEquals("", new String(update.getColumnQualifier()));
+ Assert.assertEquals("value1", new String(update.getValue()));
+
+ update = colUpdates.get(1);
+ Assert.assertEquals("col2", new String(update.getColumnFamily()));
+ Assert.assertEquals("", new String(update.getColumnQualifier()));
+ Assert.assertArrayEquals(storage.objToBytes(map, DataType.MAP),
+ update.getValue());
+ }
+
+ @Test
+ public void testWriteMultipleColumnsWithExpandedMap() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage("col1,col2:");
+
+ Map<String, Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(3);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal",
+ Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(5, colUpdates.size());
+
+ ColumnUpdate update = colUpdates.get(0);
+ Assert.assertEquals("col1", new String(update.getColumnFamily()));
+ Assert.assertEquals("", new String(update.getColumnQualifier()));
+ Assert.assertEquals("value1", new String(update.getValue()));
+
+ Map<Entry<String, String>, String> expectations = Maps.newHashMap();
+ expectations.put(Maps.immutableEntry("col2", "mapcol1"), "mapval1");
+ expectations.put(Maps.immutableEntry("col2", "mapcol2"), "mapval2");
+ expectations.put(Maps.immutableEntry("col2", "mapcol3"), "mapval3");
+ expectations.put(Maps.immutableEntry("col2", "mapcol4"), "mapval4");
+
+ for (int i = 1; i < 5; i++) {
+ update = colUpdates.get(i);
+ Entry<String, String> key = Maps.immutableEntry(
+ new String(update.getColumnFamily()),
+ new String(update.getColumnQualifier()));
+ String value = new String(update.getValue());
+ Assert.assertTrue("Did not find expected key: " + key,
+ expectations.containsKey(key));
+
+ String actual = expectations.remove(key);
+ Assert.assertEquals(value, actual);
+ }
+
+ Assert.assertTrue("Did not find all expectations",
+ expectations.isEmpty());
+ }
+
+ @Test
+ public void testWriteMapWithColFamWithColon() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage("col:");
+
+ Map<String, Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal",
+ Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(4, colUpdates.size());
+
+ Map<Entry<String, String>, String> expectations = Maps.newHashMap();
+ expectations.put(Maps.immutableEntry("col", "mapcol1"), "mapval1");
+ expectations.put(Maps.immutableEntry("col", "mapcol2"), "mapval2");
+ expectations.put(Maps.immutableEntry("col", "mapcol3"), "mapval3");
+ expectations.put(Maps.immutableEntry("col", "mapcol4"), "mapval4");
+
+ for (ColumnUpdate update : colUpdates) {
+ Entry<String, String> key = Maps.immutableEntry(
+ new String(update.getColumnFamily()),
+ new String(update.getColumnQualifier()));
+ String value = new String(update.getValue());
+ Assert.assertTrue("Did not find expected key: " + key,
+ expectations.containsKey(key));
+
+ String actual = expectations.remove(key);
+ Assert.assertEquals(value, actual);
+ }
+
+ Assert.assertTrue("Did not find all expectations",
+ expectations.isEmpty());
+ }
+
+ @Test
+ public void testWriteMapWithColFamWithColonAsterisk() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage("col:*");
+
+ Map<String, Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal",
+ Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(4, colUpdates.size());
+
+ Map<Entry<String, String>, String> expectations = Maps.newHashMap();
+ expectations.put(Maps.immutableEntry("col", "mapcol1"), "mapval1");
+ expectations.put(Maps.immutableEntry("col", "mapcol2"), "mapval2");
+ expectations.put(Maps.immutableEntry("col", "mapcol3"), "mapval3");
+ expectations.put(Maps.immutableEntry("col", "mapcol4"), "mapval4");
+
+ for (ColumnUpdate update : colUpdates) {
+ Entry<String, String> key = Maps.immutableEntry(
+ new String(update.getColumnFamily()),
+ new String(update.getColumnQualifier()));
+ String value = new String(update.getValue());
+ Assert.assertTrue("Did not find expected key: " + key,
+ expectations.containsKey(key));
+
+ String actual = expectations.remove(key);
+ Assert.assertEquals(value, actual);
+ }
+
+ Assert.assertTrue("Did not find all expectations",
+ expectations.isEmpty());
+ }
+
+ @Test
+ public void testWriteMapWithColFamColQualPrefix() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage("col:qual_*");
+
+ Map<String, Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal",
+ Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(4, colUpdates.size());
+
+ Map<Entry<String, String>, String> expectations = Maps.newHashMap();
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol1"), "mapval1");
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol2"), "mapval2");
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol3"), "mapval3");
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol4"), "mapval4");
+
+ for (ColumnUpdate update : colUpdates) {
+ Entry<String, String> key = Maps.immutableEntry(
+ new String(update.getColumnFamily()),
+ new String(update.getColumnQualifier()));
+ String value = new String(update.getValue());
+ Assert.assertTrue(expectations.containsKey(key));
+
+ String actual = expectations.remove(key);
+ Assert.assertEquals(value, actual);
+ }
+
+ Assert.assertTrue("Did not find all expectations",
+ expectations.isEmpty());
+ }
+
+ @Test
+ public void testReadSingleKey() throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "", "col1"));
+ values.add(new Value("value1".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put(":col1", new DataByteArray("value1"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
+ @Test
+ public void testReadSingleColumn() throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "col1", "cq1"));
+ keys.add(new Key("1", "col1", "cq2"));
+ keys.add(new Key("1", "col1", "cq3"));
+
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put("col1:cq1", new DataByteArray("value1"));
+ map.put("col1:cq2", new DataByteArray("value2"));
+ map.put("col1:cq3", new DataByteArray("value3"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
+ @Test
+ public void testReadMultipleColumnsAggregateColfamsAsterisk()
+ throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage("*");
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "col1", "cq1"));
+ keys.add(new Key("1", "col1", "cq2"));
+ keys.add(new Key("1", "col1", "cq3"));
+ keys.add(new Key("1", "col2", "cq1"));
+ keys.add(new Key("1", "col3", "cq1"));
+ keys.add(new Key("1", "col3", "cq2"));
+
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put("col1:cq1", new DataByteArray("value1"));
+ map.put("col1:cq2", new DataByteArray("value2"));
+ map.put("col1:cq3", new DataByteArray("value3"));
+ map.put("col2:cq1", new DataByteArray("value1"));
+ map.put("col3:cq1", new DataByteArray("value1"));
+ map.put("col3:cq2", new DataByteArray("value2"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
+ @Test
+ public void testReadMultipleColumnsAggregateColfamsAsteriskEmptyColfam()
+ throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage("*");
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "col1", ""));
+ keys.add(new Key("1", "col2", ""));
+ keys.add(new Key("1", "col3", ""));
+
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put("col1", new DataByteArray("value1"));
+ map.put("col2", new DataByteArray("value2"));
+ map.put("col3", new DataByteArray("value3"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
+ @Test
+ public void testReadMultipleColumnsEmptyString() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage("");
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "col1", "cq1"));
+ keys.add(new Key("1", "col1", "cq2"));
+ keys.add(new Key("1", "col1", "cq3"));
+ keys.add(new Key("1", "col2", "cq1"));
+ keys.add(new Key("1", "col3", "cq1"));
+ keys.add(new Key("1", "col3", "cq2"));
+
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put("col1:cq1", new DataByteArray("value1"));
+ map.put("col1:cq2", new DataByteArray("value2"));
+ map.put("col1:cq3", new DataByteArray("value3"));
+ map.put("col2:cq1", new DataByteArray("value1"));
+ map.put("col3:cq1", new DataByteArray("value1"));
+ map.put("col3:cq2", new DataByteArray("value2"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
+ @Test
+ public void testReadMultipleColumnsNoColfamAggregate() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "col1", "cq1"));
+ keys.add(new Key("1", "col1", "cq2"));
+ keys.add(new Key("1", "col1", "cq3"));
+ keys.add(new Key("1", "col2", "cq1"));
+ keys.add(new Key("1", "col3", "cq1"));
+ keys.add(new Key("1", "col3", "cq2"));
+
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put("col1:cq1", new DataByteArray("value1"));
+ map.put("col1:cq2", new DataByteArray("value2"));
+ map.put("col1:cq3", new DataByteArray("value3"));
+ map.put("col2:cq1", new DataByteArray("value1"));
+ map.put("col3:cq1", new DataByteArray("value1"));
+ map.put("col3:cq2", new DataByteArray("value2"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
+ @Test
+ public void testReadMultipleScalars() throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage("col1,col3,col5");
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ // Filtering by AccumuloInputFormat isn't applied here since we're
+ // shortcircuiting things
+ keys.add(new Key("1", "col1", ""));
+ // keys.add(new Key("1", "col2", ""));
+ keys.add(new Key("1", "col3", ""));
+ // keys.add(new Key("1", "col4", ""));
+ keys.add(new Key("1", "col5", ""));
+
+ values.add(new Value("value1".getBytes()));
+ // values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+ // values.add(new Value("value4".getBytes()));
+ values.add(new Value("value5".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(4, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ Assert.assertEquals("value1", t.get(1).toString());
+ Assert.assertEquals("value3", t.get(2).toString());
+ Assert.assertEquals("value5", t.get(3).toString());
+ }
+
+ @Test
+ public void testUnsortedColumnList() throws IOException, ParseException {
+ AccumuloStorage storage = new AccumuloStorage("z,a");
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "a", ""));
+ keys.add(new Key("1", "z", ""));
+
+ values.add(new Value("a".getBytes()));
+ values.add(new Value("z".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(3, t.size());
+
+ Assert.assertEquals("z", t.get(1).toString());
+ Assert.assertEquals("a", t.get(2).toString());
+ }
+
+ @Test
+ public void testReadMultipleScalarsAndMaps() throws IOException,
+ ParseException {
+ AccumuloStorage storage = new AccumuloStorage("z,r:,m:2,b:");
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "a", "1"));
+ keys.add(new Key("1", "b", "1"));
+ keys.add(new Key("1", "b", "2"));
+ keys.add(new Key("1", "f", "1"));
+ keys.add(new Key("1", "f", "2"));
+ keys.add(new Key("1", "m", "1"));
+ keys.add(new Key("1", "m", "2"));
+ keys.add(new Key("1", "r", "1"));
+ keys.add(new Key("1", "r", "2"));
+ keys.add(new Key("1", "r", "3"));
+ keys.add(new Key("1", "z", ""));
+
+ values.add(new Value("a1".getBytes()));
+ values.add(new Value("b1".getBytes()));
+ values.add(new Value("b2".getBytes()));
+ values.add(new Value("f1".getBytes()));
+ values.add(new Value("f2".getBytes()));
+ values.add(new Value("m1".getBytes()));
+ values.add(new Value("m2".getBytes()));
+ values.add(new Value("r1".getBytes()));
+ values.add(new Value("r2".getBytes()));
+ values.add(new Value("r3".getBytes()));
+ values.add(new Value("z1".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(5, t.size());
+
+ Assert.assertEquals(new DataByteArray("z1".getBytes()), t.get(1));
+
+ HashMap<String, DataByteArray> rMap = new HashMap<String, DataByteArray>();
+ rMap.put("r:1", new DataByteArray("r1".getBytes()));
+ rMap.put("r:2", new DataByteArray("r2".getBytes()));
+ rMap.put("r:3", new DataByteArray("r3".getBytes()));
+ Assert.assertEquals(rMap, t.get(2));
+
+ Assert.assertEquals(new DataByteArray("m2".getBytes()), t.get(3));
+
+ HashMap<String, DataByteArray> bMap = new HashMap<String, DataByteArray>();
+ bMap.put("b:1", new DataByteArray("b1".getBytes()));
+ bMap.put("b:2", new DataByteArray("b2".getBytes()));
+ Assert.assertEquals(bMap, t.get(4));
+ }
+
+}
Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageConfiguration.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageConfiguration.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageConfiguration.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestAccumuloStorageConfiguration {
+
+ protected Configuration original;
+ protected AccumuloStorage storage;
+
+ @Before
+ public void setup() throws ParseException, IOException {
+ storage = new AccumuloStorage();
+
+ original = new Configuration();
+
+ original.set("string1", "value1");
+ original.set("string2", "value2");
+ original.set("string3", "value3");
+ original.setBoolean("boolean", true);
+ original.setLong("long", 10);
+ original.setInt("integer", 20);
+ }
+
+ protected Map<String, String> getContents(Configuration conf) {
+ Map<String, String> contents = new HashMap<String, String>();
+ Iterator<Entry<String, String>> iter = conf.iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+
+ contents.put(entry.getKey(), entry.getValue());
+ }
+
+ return contents;
+ }
+
+ @Test
+ public void testClearEquivalenceStrings() {
+ Configuration clearCopy = new Configuration(original);
+
+ Assert.assertEquals(getContents(original), getContents(clearCopy));
+
+ Map<String, String> entriesToUnset = new HashMap<String, String>();
+ entriesToUnset.put("string1", "foo");
+ entriesToUnset.put("string3", "bar");
+
+ storage.clearUnset(clearCopy, entriesToUnset);
+
+ Configuration originalCopy = new Configuration();
+ for (Entry<String, String> entry : original) {
+ if (!"string1".equals(entry.getKey())
+ && !"string3".equals(entry.getKey())) {
+ originalCopy.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+ }
+
+ @Test
+ public void testClearEquivalenceOnTypes() {
+ Configuration clearCopy = new Configuration(original);
+
+ Assert.assertEquals(getContents(original), getContents(clearCopy));
+
+ Map<String, String> entriesToUnset = new HashMap<String, String>();
+ entriesToUnset.put("long", "foo");
+ entriesToUnset.put("boolean", "bar");
+ entriesToUnset.put("integer", "foobar");
+
+ storage.clearUnset(clearCopy, entriesToUnset);
+
+ Configuration originalCopy = new Configuration();
+ for (Entry<String, String> entry : original) {
+ if (!"long".equals(entry.getKey())
+ && !"boolean".equals(entry.getKey())
+ && !"integer".equals(entry.getKey())) {
+ originalCopy.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+ }
+
+ @Ignore
+ // Ignored until dependency gets updated to Hadoop >=1.2.0 and we have
+ // Configuration#unset
+ @Test
+ public void testUnsetEquivalenceStrings() {
+ Configuration unsetCopy = new Configuration(original);
+
+ Assert.assertEquals(getContents(original), getContents(unsetCopy));
+
+ Map<String, String> entriesToUnset = new HashMap<String, String>();
+ entriesToUnset.put("string1", "foo");
+ entriesToUnset.put("string3", "bar");
+
+ storage.simpleUnset(unsetCopy, entriesToUnset);
+
+ Configuration originalCopy = new Configuration();
+ for (Entry<String, String> entry : original) {
+ if (!"string1".equals(entry.getKey())
+ && !"string2".equals(entry.getKey())) {
+ originalCopy.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+ }
+
+ @Ignore
+ // Ignored until dependency gets updated to Hadoop >=1.2.0 and we have
+ // Configuration#unset
+ @Test
+ public void testEquivalenceStrings() {
+ Configuration unsetCopy = new Configuration(original), clearCopy = new Configuration(
+ original);
+
+ Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+
+ Map<String, String> entriesToUnset = new HashMap<String, String>();
+ entriesToUnset.put("string1", "foo");
+ entriesToUnset.put("string3", "bar");
+
+ storage.simpleUnset(unsetCopy, entriesToUnset);
+ storage.clearUnset(clearCopy, entriesToUnset);
+
+ Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+
+ Configuration originalCopy = new Configuration();
+ for (Entry<String, String> entry : original) {
+ if (!"string1".equals(entry.getKey())
+ && !"string2".equals(entry.getKey())) {
+ originalCopy.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+ Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+ }
+
+ @Ignore
+ // Ignored until dependency gets updated to Hadoop >=1.2.0 and we have
+ // Configuration#unset
+ @Test
+ public void testEquivalenceOnTypes() {
+ Configuration unsetCopy = new Configuration(original), clearCopy = new Configuration(
+ original);
+
+ Assert.assertEquals(getContents(original), getContents(unsetCopy));
+ Assert.assertEquals(getContents(original), getContents(clearCopy));
+
+ Map<String, String> entriesToUnset = new HashMap<String, String>();
+ entriesToUnset.put("long", "foo");
+ entriesToUnset.put("boolean", "bar");
+ entriesToUnset.put("integer", "foobar");
+
+ storage.simpleUnset(unsetCopy, entriesToUnset);
+ storage.clearUnset(clearCopy, entriesToUnset);
+
+ Configuration originalCopy = new Configuration();
+ for (Entry<String, String> entry : original) {
+ if (!"long".equals(entry.getKey())
+ && !"boolean".equals(entry.getKey())
+ && !"integer".equals(entry.getKey())) {
+ originalCopy.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+ Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+ }
+
+ @Ignore
+ // Ignored until dependency gets updated to Hadoop >=1.2.0 and we have
+ // Configuration#unset
+ @Test
+ public void testUnsetEquivalenceOnTypes() {
+ Configuration unsetCopy = new Configuration(original);
+
+ Assert.assertEquals(getContents(original), getContents(unsetCopy));
+
+ Map<String, String> entriesToUnset = new HashMap<String, String>();
+ entriesToUnset.put("long", "foo");
+ entriesToUnset.put("boolean", "bar");
+ entriesToUnset.put("integer", "foobar");
+
+ storage.simpleUnset(unsetCopy, entriesToUnset);
+
+ Configuration originalCopy = new Configuration();
+ for (Entry<String, String> entry : original) {
+ if (!"long".equals(entry.getKey())
+ && !"boolean".equals(entry.getKey())
+ && !"integer".equals(entry.getKey())) {
+ originalCopy.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+ }
+}
Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageOptions.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageOptions.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageOptions.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageOptions.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.cli.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAccumuloStorageOptions {
+
+ @Test
+ public void testFetchColumns() throws ParseException, IOException {
+ AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+ .getAbstractAccumuloStorage("cf1,cf2:cq2");
+
+ List<Column> pairs = new LinkedList<Column>();
+ pairs.add(new Column("cf1"));
+ pairs.add(new Column("cf2:cq2"));
+
+ Assert.assertEquals(pairs, storage.columns);
+ }
+
+ @Test
+ public void testWriteColumns() throws ParseException, IOException {
+ AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+ .getAbstractAccumuloStorage("foo,bar,baz,foo:bar,foo:baz");
+
+ List<Column> columnNames = Arrays.asList(new Column("foo"), new Column(
+ "bar"), new Column("baz"), new Column("foo:bar"), new Column(
+ "foo:baz"));
+
+ Assert.assertEquals(columnNames, storage.columns);
+ }
+
+ @Test
+ public void testAuths() throws ParseException, IOException {
+ AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+ .getAbstractAccumuloStorage("", "--authorizations auth1,auth2");
+
+ Authorizations auths = new Authorizations("auth1,auth2");
+
+ Assert.assertEquals(auths, storage.authorizations);
+
+ storage = TestAbstractAccumuloStorage.getAbstractAccumuloStorage("",
+ "-auths auth1,auth2");
+ Assert.assertEquals(auths, storage.authorizations);
+ }
+
+ @Test
+ public void testStartEndRows() throws ParseException, IOException {
+ AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+ .getAbstractAccumuloStorage("", "--start begin --end finish");
+
+ Assert.assertEquals("begin", storage.start);
+ Assert.assertEquals("finish", storage.end);
+
+ storage = TestAbstractAccumuloStorage.getAbstractAccumuloStorage("",
+ "-s begin -e finish");
+ Assert.assertEquals("begin", storage.start);
+ Assert.assertEquals("finish", storage.end);
+ }
+
+ @Test
+ public void testBatchWriterOptions() throws ParseException, IOException {
+ long buffSize = 1024 * 50;
+ int writeThreads = 8, maxLatency = 30 * 1000;
+
+ AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+ .getAbstractAccumuloStorage("", "--mutation-buffer-size "
+ + buffSize + " --write-threads " + writeThreads
+ + " --max-latency " + maxLatency);
+
+ Assert.assertEquals(buffSize, storage.maxMutationBufferSize);
+ Assert.assertEquals(writeThreads, storage.maxWriteThreads);
+ Assert.assertEquals(maxLatency, storage.maxLatency);
+
+ storage = TestAbstractAccumuloStorage.getAbstractAccumuloStorage("",
+ "-buff " + buffSize + " -wt " + writeThreads + " -ml "
+ + maxLatency);
+
+ Assert.assertEquals(buffSize, storage.maxMutationBufferSize);
+ Assert.assertEquals(writeThreads, storage.maxWriteThreads);
+ Assert.assertEquals(maxLatency, storage.maxLatency);
+ }
+
+ @Test
+ public void testColumnOptions() throws ParseException, IOException {
+ AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+ .getAbstractAccumuloStorage("foo-bar ",
+ "--ignore-whitespace false --separator -");
+
+ Assert.assertEquals(2, storage.columns.size());
+
+ Assert.assertEquals("foo", storage.columns.get(0).getColumnFamily());
+ Assert.assertEquals("bar ", storage.columns.get(1).getColumnFamily());
+ }
+}