You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/07 06:24:03 UTC
[1/5] git commit: ACCUMULO-1783 Add in maven-compiler-plugin
configuration to make eclipse stop defaulting to 1.5
Updated Branches:
refs/heads/ACCUMULO-1783 30fd9aa6c -> 22498f775
ACCUMULO-1783 Add in maven-compiler-plugin configuration to make eclipse
stop defaulting to 1.5
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/dd212693
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/dd212693
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/dd212693
Branch: refs/heads/ACCUMULO-1783
Commit: dd212693c8ed9f8d17ef6639f0e65d6d96ffb7d0
Parents: 30fd9aa
Author: Josh Elser <el...@apache.org>
Authored: Sun Nov 3 22:20:20 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Sun Nov 3 22:20:20 2013 -0500
----------------------------------------------------------------------
pom.xml | 13 +++++++++++++
1 file changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/dd212693/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b096123..963dab6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,6 +20,19 @@
<artifactId>accumulo-pig</artifactId>
<version>1.4.4-SNAPSHOT</version>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
<dependencies>
<dependency>
<groupId>org.apache.pig</groupId>
[5/5] git commit: ACCUMULO-1783 Rework a bit of the storage
set(Store)Locations code to account for the upstream changes in the input
format code.
Posted by el...@apache.org.
ACCUMULO-1783 Rework a bit of the storage set(Store)Locations code to
account for the upstream changes in the input format code.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/22498f77
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/22498f77
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/22498f77
Branch: refs/heads/ACCUMULO-1783
Commit: 22498f775db53351d61996f94bfd027a8dbbdf0b
Parents: 904604d
Author: Josh Elser <el...@apache.org>
Authored: Thu Nov 7 00:22:09 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Nov 7 00:22:09 2013 -0500
----------------------------------------------------------------------
.../accumulo/pig/AbstractAccumuloStorage.java | 304 ++++++++++++++++---
.../pig/AbstractAccumuloStorageTest.java | 25 +-
.../accumulo/pig/AccumuloPigClusterTest.java | 4 -
.../pig/AccumuloWholeRowStorageTest.java | 11 +-
4 files changed, 279 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 9473753..1d53371 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -16,24 +16,31 @@
*/
package org.apache.accumulo.pig;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.SequencedFormatHelper;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,8 +55,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadStoreCaster;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFuncInterface;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataBag;
@@ -72,7 +79,8 @@ import org.joda.time.DateTime;
public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
- private static final String COLON = ":", COMMA = ",";
+ private static final String COLON = ":", COMMA = ",", PERIOD = ".";
+ private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(), OUTPUT_PREFIX = AccumuloOutputFormat.class.getSimpleName();
private Configuration conf;
private RecordReader<Key,Value> reader;
@@ -204,36 +212,40 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
conf = job.getConfiguration();
setLocationFromUri(location);
- int sequence = AccumuloInputFormat.nextSequence();
-
- // TODO Something more.. "certain".
- if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
- LOG.warn("InputFormat already configured for " + sequence);
- return;
- // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
- }
-
- AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
- if (columnFamilyColumnQualifierPairs.size() > 0) {
- LOG.info("columns: " + columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
+ Map<String,String> entries = AccumuloInputFormat.getRelevantEntries(conf);
+
+ if (shouldSetInput(entries)) {
+ int sequence = AccumuloInputFormat.nextSequence(conf);
+
+ // TODO Something more.. "certain".
+ if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
+ LOG.warn("InputFormat already configured for " + sequence);
+ return;
+ // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
+ }
+
+ AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+ if (columnFamilyColumnQualifierPairs.size() > 0) {
+ LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
+ }
+
+ Collection<Range> ranges = Collections.singleton(new Range(start, end));
+
+ LOG.info("Scanning Accumulo for " + ranges);
+
+ AccumuloInputFormat.setRanges(conf, sequence, ranges);
+
+ configureInputFormat(conf, sequence);
}
-
- Collection<Range> ranges = Collections.singleton(new Range(start, end));
-
- LOG.info("Scanning Accumulo for " + ranges);
-
- AccumuloInputFormat.setRanges(conf, sequence, ranges);
-
- configureInputFormat(conf, sequence);
}
protected void configureInputFormat(Configuration conf, int sequence) {
}
- protected void configureOutputFormat(Configuration conf) {
+ protected void configureOutputFormat(Configuration conf, int sequence) {
}
@@ -268,22 +280,240 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
conf = job.getConfiguration();
setLocationFromUri(location);
- int sequence = AccumuloOutputFormat.nextSequence();
+ Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
- // TODO Something more.. "certain".
- if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
- LOG.warn("OutputFormat already configured for " + sequence);
- return;
- // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
+ if (shouldSetOutput(entries)) {
+ int sequence = AccumuloOutputFormat.nextSequence(conf);
+
+ // TODO Something more.. "certain".
+ if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
+ LOG.warn("OutputFormat already configured for " + sequence);
+ return;
+ // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
+ }
+
+ AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true, table);
+ AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+ AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
+ AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
+ AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
+
+ configureOutputFormat(conf, sequence);
+ } else {
+ LOG.debug("Not setting configuration as it appears that it has already been set");
}
+ }
+
+ private boolean shouldSetInput(Map<String,String> configEntries) {
+ Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
- AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true, table);
- AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
- AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
- AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
- AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
+ for (Map<String,String> group : groupedConfigEntries.values()) {
+ if (null != inst) {
+ if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
+ continue;
+ }
+ } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
+ continue;
+ }
+
+ if (null != zookeepers) {
+ if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
+ continue;
+ }
+ } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
+ continue;
+ }
+
+ if (null != user) {
+ if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
+ continue;
+ }
+ } else if (null != group.get(INPUT_PREFIX + ".username")) {
+ continue;
+ }
+
+ if (null != password) {
+ if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) {
+ continue;
+ }
+ } else if (null != group.get(INPUT_PREFIX + ".password")) {
+ continue;
+ }
+
+ if (null != table) {
+ if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
+ continue;
+ }
+ } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
+ continue;
+ }
+
+ if (null != authorizations) {
+ if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) {
+ continue;
+ }
+ } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
+ continue;
+ }
+
+ String columnValues = group.get(INPUT_PREFIX + ".columns");
+ if (null != columnFamilyColumnQualifierPairs) {
+ StringBuilder expected = new StringBuilder(128);
+ for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+ if (0 < expected.length()) {
+ expected.append(COMMA);
+ }
+
+ expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
+ if (column.getSecond() != null)
+ expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
+ }
+
+ if (!expected.toString().equals(columnValues)) {
+ continue;
+ }
+ } else if (null != columnValues){
+ continue;
+ }
+
+ Range expected = new Range(start, end);
+ String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
+ if (null != serializedRanges) {
+ try {
+ // We currently only support serializing one Range into the Configuration from this Storage class
+ ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
+ Range range = new Range();
+ range.readFields(new DataInputStream(bais));
+
+ if (!expected.equals(range)) {
+ continue;
+ }
+ } catch (IOException e) {
+ // Got an exception, they don't match
+ continue;
+ }
+ }
+
+
+ // We found a group of entries in the config which are (similar to) what
+ // we would have set.
+ return false;
+ }
+
+ // We didn't find any entries that seemed to match, write the config
+ return true;
+ }
+
+ private boolean shouldSetOutput(Map<String,String> configEntries) {
+ Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
+
+ for (Map<String,String> group : groupedConfigEntries.values()) {
+ if (null != inst) {
+ if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
+ continue;
+ }
+ } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
+ continue;
+ }
+
+ if (null != zookeepers) {
+ if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
+ continue;
+ }
+ } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
+ continue;
+ }
+
+ if (null != user) {
+ if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
+ continue;
+ }
+ } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
+ continue;
+ }
+
+ if (null != password) {
+ if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) {
+ continue;
+ }
+ } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
+ continue;
+ }
+
+ if (null != table) {
+ if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
+ continue;
+ }
+ } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
+ continue;
+ }
+
+ String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
+ try {
+ if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) {
+ continue;
+ }
+ } catch (NumberFormatException e) {
+ // Wasn't a number, so it can't match what we were expecting
+ continue;
+ }
+
+ String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
+ try {
+ if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) {
+ continue;
+ }
+ } catch (NumberFormatException e) {
+ // Wasn't a number, so it can't match what we were expecting
+ continue;
+ }
+
+ String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
+ try {
+ if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
+ continue;
+ }
+ } catch (NumberFormatException e) {
+ // Wasn't a number, so it can't match what we were expecting
+ continue;
+ }
+
+ // We found a group of entries in the config which are (similar to) what
+ // we would have set.
+ return false;
+ }
+
+ // We didn't find any entries that seemed to match, write the config
+ return true;
+ }
+
+ private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) {
+ Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
+ for (Entry<String,String> entry : entries.entrySet()) {
+ final String key = entry.getKey(), value = entry.getValue();
+
+ if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
+ continue;
+ }
+
+ int index = key.lastIndexOf(PERIOD);
+ if (-1 != index) {
+ int group = Integer.parseInt(key.substring(index + 1));
+ String name = key.substring(0, index);
+
+ Map<String,String> entriesInGroup = groupedEntries.get(group);
+ if (null == entriesInGroup) {
+ entriesInGroup = new HashMap<String,String>();
+ groupedEntries.put(group, entriesInGroup);
+ }
+
+ entriesInGroup.put(name, value);
+ } else {
+ LOG.warn("Could not parse key: " + key);
+ }
+ }
- configureOutputFormat(conf);
+ return groupedEntries;
}
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index e9f0297..bc886ec 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -33,24 +33,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.data.Tuple;
-import org.junit.Before;
import org.junit.Test;
public class AbstractAccumuloStorageTest {
- @Before
- public void setup() {
- AccumuloInputFormat.resetCounters();
- AccumuloOutputFormat.resetCounters();
- }
-
- public Job getExpectedLoadJob(int sequence, String inst, String zookeepers, String user, String password, String table, String start, String end,
+ public Job getExpectedLoadJob(String inst, String zookeepers, String user, String password, String table, String start, String end,
Authorizations authorizations, List<Pair<Text,Text>> columnFamilyColumnQualifierPairs) throws IOException {
Collection<Range> ranges = new LinkedList<Range>();
ranges.add(new Range(start, end));
Job expected = new Job();
Configuration expectedConf = expected.getConfiguration();
+ final int sequence = AccumuloInputFormat.nextSequence(expectedConf);
AccumuloInputFormat.setInputInfo(expectedConf, sequence, user, password.getBytes(), table, authorizations);
AccumuloInputFormat.setZooKeeperInstance(expectedConf, sequence, inst, zookeepers);
AccumuloInputFormat.fetchColumns(expectedConf, sequence, columnFamilyColumnQualifierPairs);
@@ -58,7 +52,7 @@ public class AbstractAccumuloStorageTest {
return expected;
}
- public Job getDefaultExpectedLoadJob(int sequence) throws IOException {
+ public Job getDefaultExpectedLoadJob() throws IOException {
String inst = "myinstance";
String zookeepers = "127.0.0.1:2181";
String user = "root";
@@ -73,14 +67,15 @@ public class AbstractAccumuloStorageTest {
columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col2"), new Text("cq2")));
columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col3"), null));
- Job expected = getExpectedLoadJob(sequence, inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
+ Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
return expected;
}
- public Job getExpectedStoreJob(int sequence, String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
+ public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
int maxWriteLatencyMS) throws IOException {
Job expected = new Job();
Configuration expectedConf = expected.getConfiguration();
+ final int sequence = AccumuloOutputFormat.nextSequence(expectedConf);
AccumuloOutputFormat.setOutputInfo(expectedConf, sequence,user, password.getBytes(), true, table);
AccumuloOutputFormat.setZooKeeperInstance(expectedConf, sequence,inst, zookeepers);
AccumuloOutputFormat.setMaxLatency(expectedConf, sequence,maxWriteLatencyMS);
@@ -90,7 +85,7 @@ public class AbstractAccumuloStorageTest {
return expected;
}
- public Job getDefaultExpectedStoreJob(int sequence) throws IOException {
+ public Job getDefaultExpectedStoreJob() throws IOException {
String inst = "myinstance";
String zookeepers = "127.0.0.1:2181";
String user = "root";
@@ -100,7 +95,7 @@ public class AbstractAccumuloStorageTest {
int writeThreads = 7;
int maxWriteLatencyMS = 30000;
- Job expected = getExpectedStoreJob(sequence, inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
+ Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
return expected;
}
@@ -136,7 +131,7 @@ public class AbstractAccumuloStorageTest {
s.setLocation(getDefaultLoadLocation(), actual);
Configuration actualConf = actual.getConfiguration();
- Job expected = getDefaultExpectedLoadJob(1);
+ Job expected = getDefaultExpectedLoadJob();
Configuration expectedConf = expected.getConfiguration();
TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
@@ -150,7 +145,7 @@ public class AbstractAccumuloStorageTest {
s.setStoreLocation(getDefaultStoreLocation(), actual);
Configuration actualConf = actual.getConfiguration();
- Job expected = getDefaultExpectedStoreJob(1);
+ Job expected = getDefaultExpectedStoreJob();
Configuration expectedConf = expected.getConfiguration();
TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
index 0e2abb5..10b0a2a 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
@@ -10,8 +10,6 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
@@ -54,8 +52,6 @@ public class AccumuloPigClusterTest {
@Before
public void beforeTest() throws Exception {
- AccumuloInputFormat.resetCounters();
- AccumuloOutputFormat.resetCounters();
pig = new PigServer(ExecType.LOCAL, conf);
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index 3a0ab85..b57c7ba 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -30,7 +30,6 @@ import java.util.List;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -43,16 +42,9 @@ import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.junit.Before;
import org.junit.Test;
public class AccumuloWholeRowStorageTest {
-
- @Before
- public void setup() {
- AccumuloInputFormat.resetCounters();
- AccumuloOutputFormat.resetCounters();
- }
@Test
public void testConfiguration() throws IOException {
@@ -64,9 +56,10 @@ public class AccumuloWholeRowStorageTest {
s.setLocation(test.getDefaultLoadLocation(), actual);
Configuration actualConf = actual.getConfiguration();
+ // A little brittle. We know this is the sequence number used when we create the DefaultExpectedLoadJob()
final int sequence = 1;
- Job expected = test.getDefaultExpectedLoadJob(sequence);
+ Job expected = test.getDefaultExpectedLoadJob();
Configuration expectedConf = expected.getConfiguration();
AccumuloInputFormat.addIterator(expectedConf, sequence, new IteratorSetting(50, WholeRowIterator.class));
[3/5] git commit: ACCUMULO-1783 Lift some pig test classes to write a
better "functional" test that ensures that joins actually work.
Posted by el...@apache.org.
ACCUMULO-1783 Lift some pig test classes to write a better "functional"
test that ensures that joins actually work.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/9b398d4a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/9b398d4a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/9b398d4a
Branch: refs/heads/ACCUMULO-1783
Commit: 9b398d4a32e50d3503b4ecb2f86a306e9db0221b
Parents: d72e1cb
Author: Josh Elser <el...@apache.org>
Authored: Tue Nov 5 17:14:33 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Nov 5 17:14:33 2013 -0500
----------------------------------------------------------------------
.../accumulo/pig/AccumuloPigClusterTest.java | 165 +++++++++++++++++++
.../java/org/apache/pig/test/MiniCluster.java | 86 ++++++++++
.../org/apache/pig/test/MiniGenericCluster.java | 123 ++++++++++++++
3 files changed, 374 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9b398d4a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
new file mode 100644
index 0000000..0e2abb5
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
@@ -0,0 +1,165 @@
+package org.apache.accumulo.pig;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class AccumuloPigClusterTest {
+
+ private static final File tmpdir = Files.createTempDir();
+ private static MiniAccumuloCluster accumuloCluster;
+ private static MiniCluster cluster;
+ private static Configuration conf;
+ private PigServer pig;
+
+ @BeforeClass
+ public static void setupClusters() throws Exception {
+ MiniAccumuloConfig macConf = new MiniAccumuloConfig(tmpdir, "password");
+ macConf.setNumTservers(1);
+
+ accumuloCluster = new MiniAccumuloCluster(macConf);
+ accumuloCluster.start();
+
+ // This is needed by Pig
+ cluster = MiniCluster.buildCluster();
+ conf = cluster.getConfiguration();
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ AccumuloInputFormat.resetCounters();
+ AccumuloOutputFormat.resetCounters();
+ pig = new PigServer(ExecType.LOCAL, conf);
+ }
+
+ @AfterClass
+ public static void stopClusters() throws Exception {
+ accumuloCluster.stop();
+ FileUtils.deleteDirectory(tmpdir);
+ }
+
+ private void loadTestData() throws Exception {
+ ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCluster.getInstanceName(), accumuloCluster.getZooKeepers());
+ Connector c = inst.getConnector("root", "password");
+
+ TableOperations tops = c.tableOperations();
+ if (!tops.exists("airports")) {
+ tops.create("airports");
+ }
+
+ if (!tops.exists("flights")) {
+ tops.create("flights");
+ }
+
+ @SuppressWarnings("unchecked")
+ final List<ImmutableMap<String,String>> airportData = Lists.newArrayList(ImmutableMap.of("code", "SJC", "name", "San Jose"),
+ ImmutableMap.of("code", "SFO", "name", "San Francisco"), ImmutableMap.of("code", "MDO", "name", "Orlando"),
+ ImmutableMap.of("code", "MDW", "name", "Chicago-Midway"), ImmutableMap.of("code", "JFK", "name", "JFK International"),
+ ImmutableMap.of("code", "BWI", "name", "Baltimore-Washington"));
+
+ BatchWriter bw = c.createBatchWriter("airports", 100000l, 1000l, 1);
+ try {
+ int i = 1;
+ for (Map<String,String> record : airportData) {
+ Mutation m = new Mutation(Integer.toString(i));
+
+ for (Entry<String,String> entry : record.entrySet()) {
+ m.put(entry.getKey(), "", entry.getValue());
+ }
+
+ bw.addMutation(m);
+ i++;
+ }
+ } finally {
+ if (null != bw) {
+ bw.close();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ final List<ImmutableMap<String,String>> flightData = Lists.newArrayList(ImmutableMap.of("origin", "BWI", "destination", "SFO"),
+ ImmutableMap.of("origin", "BWI", "destination", "SJC"), ImmutableMap.of("origin", "MDW", "destination", "MDO"),
+ ImmutableMap.of("origin", "MDO", "destination", "SJC"), ImmutableMap.of("origin", "SJC", "destination", "JFK"),
+ ImmutableMap.of("origin", "JFK", "destination", "MDW"));
+
+ bw = c.createBatchWriter("flights", 100000l, 1000l, 1);
+ try {
+ int i = 1;
+ for (Map<String,String> record : flightData) {
+ Mutation m = new Mutation(Integer.toString(i));
+
+ for (Entry<String,String> entry : record.entrySet()) {
+ m.put(entry.getKey(), "", entry.getValue());
+ }
+
+ bw.addMutation(m);
+ i++;
+ }
+ } finally {
+ if (null != bw) {
+ bw.close();
+ }
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ loadTestData();
+
+ final String loadFlights = "flights = LOAD 'accumulo://flights?instance=" + accumuloCluster.getInstanceName() +
+ "&user=root&password=password&zookeepers=" + accumuloCluster.getZooKeepers() + "' using org.apache.accumulo.pig.AccumuloStorage()" +
+ " as (rowKey:chararray, column_map:map[]);";
+
+ final String loadAirports = "airports = LOAD 'accumulo://airports?instance=" + accumuloCluster.getInstanceName() +
+ "&user=root&password=password&zookeepers=" + accumuloCluster.getZooKeepers() + "' using org.apache.accumulo.pig.AccumuloStorage()" +
+ " as (rowKey:chararray, column_map:map[]);";
+
+ final String joinQuery = "joined = JOIN flights BY column_map#'origin', airports BY column_map#'code';";
+
+ // System.out.println(query);
+
+ pig.registerQuery(loadFlights);
+ pig.registerQuery(loadAirports);
+ pig.registerQuery(joinQuery);
+
+ Iterator<Tuple> it = pig.openIterator("joined");
+
+ int i = 0;
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ System.out.println(t);
+ i++;
+ }
+
+ // TODO actually verify something here
+ Assert.assertTrue("Should have found records but found none", i > 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9b398d4a/src/test/java/org/apache/pig/test/MiniCluster.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pig/test/MiniCluster.java b/src/test/java/org/apache/pig/test/MiniCluster.java
new file mode 100644
index 0000000..64467ae
--- /dev/null
+++ b/src/test/java/org/apache/pig/test/MiniCluster.java
@@ -0,0 +1,86 @@
+package org.apache.pig.test;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+
+public class MiniCluster extends MiniGenericCluster {
+ private static final File CONF_DIR = new File("build/classes");
+ private static final File CONF_FILE = new File(CONF_DIR, "hadoop-site.xml");
+
+ private MiniMRCluster m_mr = null;
+ public MiniCluster() {
+ super();
+ }
+
+ @Override
+ protected void setupMiniDfsAndMrClusters() {
+ try {
+ System.setProperty("hadoop.log.dir", "build/test/logs");
+ final int dataNodes = 4; // There will be 4 data nodes
+ final int taskTrackers = 4; // There will be 4 task tracker nodes
+
+ // Create the dir that holds hadoop-site.xml file
+ // Delete if hadoop-site.xml exists already
+ CONF_DIR.mkdirs();
+ if(CONF_FILE.exists()) {
+ CONF_FILE.delete();
+ }
+
+ // Builds and starts the mini dfs and mapreduce clusters
+ Configuration config = new Configuration();
+ m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
+ m_fileSys = m_dfs.getFileSystem();
+ m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
+
+ // Write the necessary config info to hadoop-site.xml
+ m_conf = m_mr.createJobConf();
+ m_conf.setInt("mapred.submit.replication", 2);
+ m_conf.set("dfs.datanode.address", "0.0.0.0:0");
+ m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+ m_conf.set("mapred.map.max.attempts", "2");
+ m_conf.set("mapred.reduce.max.attempts", "2");
+ m_conf.set("pig.jobcontrol.sleep", "100");
+ m_conf.writeXml(new FileOutputStream(CONF_FILE));
+
+ // Set the system properties needed by Pig
+ System.setProperty("cluster", m_conf.get("mapred.job.tracker"));
+ System.setProperty("namenode", m_conf.get("fs.default.name"));
+ System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void shutdownMiniMrClusters() {
+ // Delete hadoop-site.xml on shutDown
+ if(CONF_FILE.exists()) {
+ CONF_FILE.delete();
+ }
+ if (m_mr != null) { m_mr.shutdown(); }
+ m_mr = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9b398d4a/src/test/java/org/apache/pig/test/MiniGenericCluster.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pig/test/MiniGenericCluster.java b/src/test/java/org/apache/pig/test/MiniGenericCluster.java
new file mode 100644
index 0000000..584631a
--- /dev/null
+++ b/src/test/java/org/apache/pig/test/MiniGenericCluster.java
@@ -0,0 +1,123 @@
+package org.apache.pig.test;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import java.io.*;
+import java.util.Properties;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+
+/**
+ * This class builds a single instance of itself with the Singleton
+ * design pattern. While building the single instance, it sets up a
+ * mini cluster that actually consists of a mini DFS cluster and a
+ * mini MapReduce cluster on the local machine and also sets up the
+ * environment for Pig to run on top of the mini cluster.
+ *
+ * This class is the base class for MiniCluster, which has slightly
+ * difference among different versions of hadoop. MiniCluster implementation
+ * is located in $PIG_HOME/shims.
+ */
+abstract public class MiniGenericCluster {
+ protected MiniDFSCluster m_dfs = null;
+ protected FileSystem m_fileSys = null;
+ protected Configuration m_conf = null;
+
+ protected final static MiniCluster INSTANCE = new MiniCluster();
+ protected static boolean isSetup = true;
+
+ protected MiniGenericCluster() {
+ setupMiniDfsAndMrClusters();
+ }
+
+ abstract protected void setupMiniDfsAndMrClusters();
+
+ /**
+ * Returns the single instance of class MiniClusterBuilder that
+ * represents the resouces for a mini dfs cluster and a mini
+ * mapreduce cluster.
+ */
+ public static MiniCluster buildCluster() {
+ if(! isSetup){
+ INSTANCE.setupMiniDfsAndMrClusters();
+ isSetup = true;
+ }
+ return INSTANCE;
+ }
+
+ public void shutDown(){
+ INSTANCE.shutdownMiniDfsAndMrClusters();
+ }
+
+ protected void finalize() {
+ shutdownMiniDfsAndMrClusters();
+ }
+
+ protected void shutdownMiniDfsAndMrClusters() {
+ isSetup = false;
+ shutdownMiniDfsClusters();
+ shutdownMiniMrClusters();
+ }
+
+ protected void shutdownMiniDfsClusters() {
+ try {
+ if (m_fileSys != null) { m_fileSys.close(); }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (m_dfs != null) { m_dfs.shutdown(); }
+ m_fileSys = null;
+ m_dfs = null;
+ }
+
+ abstract protected void shutdownMiniMrClusters();
+
+ public Properties getProperties() {
+ errorIfNotSetup();
+ return ConfigurationUtil.toProperties(m_conf);
+ }
+
+ public Configuration getConfiguration() {
+ return new Configuration(m_conf);
+ }
+
+ public void setProperty(String name, String value) {
+ errorIfNotSetup();
+ m_conf.set(name, value);
+ }
+
+ public FileSystem getFileSystem() {
+ errorIfNotSetup();
+ return m_fileSys;
+ }
+
+ /**
+ * Throw RunTimeException if isSetup is false
+ */
+ private void errorIfNotSetup(){
+ if(isSetup)
+ return;
+ String msg = "function called on MiniCluster that has been shutdown";
+ throw new RuntimeException(msg);
+ }
+}
[2/5] git commit: ACCUMULO-1783 Use the sequence id to ensure AIF and
AOF don't clobber one another.
Posted by el...@apache.org.
ACCUMULO-1783 Use the sequence id to ensure AIF and AOF don't clobber
one another.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/d72e1cb5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/d72e1cb5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/d72e1cb5
Branch: refs/heads/ACCUMULO-1783
Commit: d72e1cb56b1c16bd09bff28d2d72163063781d63
Parents: dd21269
Author: Josh Elser <el...@apache.org>
Authored: Tue Nov 5 17:08:08 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Nov 5 17:08:08 2013 -0500
----------------------------------------------------------------------
pom.xml | 17 +++++++--
.../accumulo/pig/AbstractAccumuloStorage.java | 36 +++++++++++---------
.../apache/accumulo/pig/AccumuloStorage.java | 7 ++--
.../accumulo/pig/AccumuloWholeRowStorage.java | 5 +--
.../pig/AbstractAccumuloStorageTest.java | 23 ++++++++-----
.../pig/AccumuloWholeRowStorageTest.java | 14 ++++++--
6 files changed, 69 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 963dab6..0c82c39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,14 @@
<target>1.6</target>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.16</version>
+ <configuration>
+ <argLine>-Xmx4g</argLine>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ </configuration>
+ </plugin>
</plugins>
</build>
@@ -50,9 +58,14 @@
<version>1.2.1</version>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>1.2.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
- <version>1.4.4</version>
+ <version>1.4.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
@@ -73,7 +86,7 @@
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
- <version>1.4.4</version>
+ <version>1.4.5-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 37efe84..da4a51b 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -204,25 +204,29 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
conf = job.getConfiguration();
setLocationFromUri(location);
- if (!conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false)) {
- AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
- if (columnFamilyColumnQualifierPairs.size() > 0) {
- LOG.info("columns: " + columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
- }
-
- Collection<Range> ranges = Collections.singleton(new Range(start, end));
-
- LOG.info("Scanning Accumulo for " + ranges);
-
- AccumuloInputFormat.setRanges(conf, ranges);
-
- configureInputFormat(conf);
+ int sequence = AccumuloInputFormat.nextSequence();
+
+ if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false)) {
+ throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
}
+
+ AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+ if (columnFamilyColumnQualifierPairs.size() > 0) {
+ LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
+ }
+
+ Collection<Range> ranges = Collections.singleton(new Range(start, end));
+
+ LOG.info("Scanning Accumulo for " + ranges);
+
+ AccumuloInputFormat.setRanges(conf, sequence, ranges);
+
+ configureInputFormat(conf, sequence);
}
- protected void configureInputFormat(Configuration conf) {
+ protected void configureInputFormat(Configuration conf, int sequence) {
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index dcfd888..bd43dce 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -141,8 +141,9 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
return map;
}
- protected void configureInputFormat(Configuration conf) {
- AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
+ @Override
+ protected void configureInputFormat(Configuration conf, int sequence) {
+ AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
}
@Override
@@ -229,7 +230,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
*/
protected void addColumn(Mutation mutation, String columnDef, String columnName, Value columnValue) {
if (null == columnDef && null == columnName) {
- // TODO Emit a counter here somehow?
+ // TODO Emit a counter here somehow? org.apache.pig.tools.pigstats.PigStatusReporter
log.warn("Was provided no name or definition for column. Ignoring value");
return;
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index af3ee01..499558f 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -83,8 +83,9 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
return tuple;
}
- protected void configureInputFormat(Configuration conf) {
- AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
+ @Override
+ protected void configureInputFormat(Configuration conf, int sequence) {
+ AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 1b5b81a..5f4ecf2 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -33,25 +33,32 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.data.Tuple;
+import org.junit.Before;
import org.junit.Test;
public class AbstractAccumuloStorageTest {
- public Job getExpectedLoadJob(String inst, String zookeepers, String user, String password, String table, String start, String end,
+ @Before
+ public void setup() {
+ AccumuloInputFormat.resetCounters();
+ AccumuloOutputFormat.resetCounters();
+ }
+
+ public Job getExpectedLoadJob(int sequence, String inst, String zookeepers, String user, String password, String table, String start, String end,
Authorizations authorizations, List<Pair<Text,Text>> columnFamilyColumnQualifierPairs) throws IOException {
Collection<Range> ranges = new LinkedList<Range>();
ranges.add(new Range(start, end));
Job expected = new Job();
Configuration expectedConf = expected.getConfiguration();
- AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
- AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.setRanges(expectedConf, ranges);
+ AccumuloInputFormat.setInputInfo(expectedConf, sequence, user, password.getBytes(), table, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(expectedConf, sequence, inst, zookeepers);
+ AccumuloInputFormat.fetchColumns(expectedConf, sequence, columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.setRanges(expectedConf, sequence, ranges);
return expected;
}
- public Job getDefaultExpectedLoadJob() throws IOException {
+ public Job getDefaultExpectedLoadJob(int sequence) throws IOException {
String inst = "myinstance";
String zookeepers = "127.0.0.1:2181";
String user = "root";
@@ -66,7 +73,7 @@ public class AbstractAccumuloStorageTest {
columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col2"), new Text("cq2")));
columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col3"), null));
- Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
+ Job expected = getExpectedLoadJob(sequence, inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
return expected;
}
@@ -129,7 +136,7 @@ public class AbstractAccumuloStorageTest {
s.setLocation(getDefaultLoadLocation(), actual);
Configuration actualConf = actual.getConfiguration();
- Job expected = getDefaultExpectedLoadJob();
+ Job expected = getDefaultExpectedLoadJob(1);
Configuration expectedConf = expected.getConfiguration();
TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index 690d86c..3a0ab85 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -30,6 +30,7 @@ import java.util.List;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -42,9 +43,16 @@ import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.junit.Before;
import org.junit.Test;
public class AccumuloWholeRowStorageTest {
+
+ @Before
+ public void setup() {
+ AccumuloInputFormat.resetCounters();
+ AccumuloOutputFormat.resetCounters();
+ }
@Test
public void testConfiguration() throws IOException {
@@ -56,9 +64,11 @@ public class AccumuloWholeRowStorageTest {
s.setLocation(test.getDefaultLoadLocation(), actual);
Configuration actualConf = actual.getConfiguration();
- Job expected = test.getDefaultExpectedLoadJob();
+ final int sequence = 1;
+
+ Job expected = test.getDefaultExpectedLoadJob(sequence);
Configuration expectedConf = expected.getConfiguration();
- AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(50, WholeRowIterator.class));
+ AccumuloInputFormat.addIterator(expectedConf, sequence, new IteratorSetting(50, WholeRowIterator.class));
TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
}
[4/5] git commit: ACCUMULO-1783 Update the setStoreLocation method
and tests.
Posted by el...@apache.org.
ACCUMULO-1783 Update the setStoreLocation method and tests.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/904604db
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/904604db
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/904604db
Branch: refs/heads/ACCUMULO-1783
Commit: 904604db28adad26bc0ba02e7c78db15db6424e8
Parents: 9b398d4
Author: Josh Elser <el...@apache.org>
Authored: Tue Nov 5 20:05:08 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Nov 5 20:05:08 2013 -0500
----------------------------------------------------------------------
.../accumulo/pig/AbstractAccumuloStorage.java | 39 +++++++++++++-------
.../pig/AbstractAccumuloStorageTest.java | 18 ++++-----
2 files changed, 34 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/904604db/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index da4a51b..9473753 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -94,7 +94,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
int maxWriteThreads = 10;
long maxMutationBufferSize = 10 * 1000 * 1000;
int maxLatency = 10 * 1000;
-
+
protected LoadStoreCaster caster;
protected ResourceSchema schema;
protected String contextSignature = null;
@@ -206,8 +206,11 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
int sequence = AccumuloInputFormat.nextSequence();
- if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false)) {
- throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
+ // TODO Something more.. "certain".
+ if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
+ LOG.warn("InputFormat already configured for " + sequence);
+ return;
+ // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
}
AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
@@ -248,7 +251,8 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
- }
+ }
+
/**
* Returns UDFProperties based on <code>contextSignature</code>.
*/
@@ -264,14 +268,22 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
conf = job.getConfiguration();
setLocationFromUri(location);
- if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false)) {
- AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
- AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
- AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
- AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
- AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
- configureOutputFormat(conf);
+ int sequence = AccumuloOutputFormat.nextSequence();
+
+ // TODO Something more.. "certain".
+ if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
+ LOG.warn("OutputFormat already configured for " + sequence);
+ return;
+ // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
}
+
+ AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true, table);
+ AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+ AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
+ AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
+ AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
+
+ configureOutputFormat(conf);
}
@SuppressWarnings("rawtypes")
@@ -298,7 +310,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
}
public void cleanupOnFailure(String failure, Job job) {}
-
+
public void cleanupOnSuccess(String location, Job job) {}
@Override
@@ -310,7 +322,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
schema = s;
getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
}
-
protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
Object o = tuple.get(i);
@@ -341,7 +352,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
}
- protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
+ protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
Object o = tuple.get(i);
byte type = schemaToType(o, i, fieldSchemas);
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/904604db/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 5f4ecf2..e9f0297 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -77,20 +77,20 @@ public class AbstractAccumuloStorageTest {
return expected;
}
- public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
+ public Job getExpectedStoreJob(int sequence, String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
int maxWriteLatencyMS) throws IOException {
Job expected = new Job();
Configuration expectedConf = expected.getConfiguration();
- AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
- AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
- AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
- AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
- AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
+ AccumuloOutputFormat.setOutputInfo(expectedConf, sequence,user, password.getBytes(), true, table);
+ AccumuloOutputFormat.setZooKeeperInstance(expectedConf, sequence,inst, zookeepers);
+ AccumuloOutputFormat.setMaxLatency(expectedConf, sequence,maxWriteLatencyMS);
+ AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, sequence,maxWriteBufferSize);
+ AccumuloOutputFormat.setMaxWriteThreads(expectedConf, sequence,writeThreads);
return expected;
}
- public Job getDefaultExpectedStoreJob() throws IOException {
+ public Job getDefaultExpectedStoreJob(int sequence) throws IOException {
String inst = "myinstance";
String zookeepers = "127.0.0.1:2181";
String user = "root";
@@ -100,7 +100,7 @@ public class AbstractAccumuloStorageTest {
int writeThreads = 7;
int maxWriteLatencyMS = 30000;
- Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
+ Job expected = getExpectedStoreJob(sequence, inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
return expected;
}
@@ -150,7 +150,7 @@ public class AbstractAccumuloStorageTest {
s.setStoreLocation(getDefaultStoreLocation(), actual);
Configuration actualConf = actual.getConfiguration();
- Job expected = getDefaultExpectedStoreJob();
+ Job expected = getDefaultExpectedStoreJob(1);
Configuration expectedConf = expected.getConfiguration();
TestUtils.assertConfigurationsEqual(expectedConf, actualConf);