You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/24 06:53:30 UTC
[1/2] git commit: ACCUMULO-1783 Update code to work with upstream 1.5
changes.
Updated Branches:
refs/heads/ACCUMULO-1783-1.5 [created] cb720e850
ACCUMULO-1783 Update code to work with upstream 1.5 changes.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/0ce6fb34
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/0ce6fb34
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/0ce6fb34
Branch: refs/heads/ACCUMULO-1783-1.5
Commit: 0ce6fb34a852050643aae5ad5feb5c21c4b6b94e
Parents: 170229a
Author: Josh Elser <el...@apache.org>
Authored: Fri Nov 22 18:00:15 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Nov 22 18:00:15 2013 -0500
----------------------------------------------------------------------
pom.xml | 11 ++-
.../accumulo/pig/AbstractAccumuloStorage.java | 79 ++++++++++++--------
.../apache/accumulo/pig/AccumuloStorage.java | 7 +-
.../accumulo/pig/AccumuloWholeRowStorage.java | 6 +-
.../pig/AbstractAccumuloStorageTest.java | 48 ++++++++----
5 files changed, 98 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0c82c39..587bf1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-pig</artifactId>
- <version>1.4.4-SNAPSHOT</version>
+ <version>1.5.1-SNAPSHOT</version>
<build>
<plugins>
@@ -65,7 +65,7 @@
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
- <version>1.4.5-SNAPSHOT</version>
+ <version>1.5.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
@@ -78,6 +78,11 @@
<version>15.0</version>
</dependency>
<dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.16</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
@@ -86,7 +91,7 @@
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
- <version>1.4.5-SNAPSHOT</version>
+ <version>1.5.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index a829d4a..890abf3 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -26,10 +26,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import java.util.Properties;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
@@ -77,7 +81,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
private static final String COLON = ":", COMMA = ",";
private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName();
- private Configuration conf;
private RecordReader<Key,Value> reader;
private RecordWriter<Text,Mutation> writer;
@@ -104,6 +107,10 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
public AbstractAccumuloStorage() {}
+ protected Map<String,String> getInputFormatEntries(Configuration conf) {
+ return getEntries(conf, INPUT_PREFIX);
+ }
+
@Override
public Tuple getNext() throws IOException {
try {
@@ -202,10 +209,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
return writer;
}
- protected Map<String,String> getInputFormatEntries(Configuration conf) {
- return getEntries(conf, INPUT_PREFIX);
- }
-
protected Map<String,String> getEntries(Configuration conf, String prefix) {
Map<String,String> entries = new HashMap<String,String>();
@@ -221,44 +224,50 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
@Override
public void setLocation(String location, Job job) throws IOException {
- conf = job.getConfiguration();
setLocationFromUri(location);
- Map<String,String> entries = getInputFormatEntries(conf);
-
+ Map<String,String> entries = getInputFormatEntries(job.getConfiguration());
for (String key : entries.keySet()) {
- conf.unset(key);
+ job.getConfiguration().unset(key);
}
- AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+ try {
+ AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+
+ AccumuloInputFormat.setInputTableName(job, table);
+ AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
+
if (columnFamilyColumnQualifierPairs.size() > 0) {
LOG.info("columns: " + columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
}
Collection<Range> ranges = Collections.singleton(new Range(start, end));
LOG.info("Scanning Accumulo for " + ranges + " for table " + table);
- AccumuloInputFormat.setRanges(conf, ranges);
+ AccumuloInputFormat.setRanges(job, ranges);
- configureInputFormat(conf);
+ configureInputFormat(job);
}
/**
- * Method to allow specific implementations to add more elements to the Configuration for reading data from Accumulo.
+ * Method to allow specific implementations to add more elements to the Job for reading data from Accumulo.
*
- * @param conf
+ * @param job
*/
- protected void configureInputFormat(Configuration conf) {}
+ protected void configureInputFormat(Job job) {}
/**
- * Method to allow specific implementations to add more elements to the Configuration for writing data to Accumulo.
+ * Method to allow specific implementations to add more elements to the Job for writing data to Accumulo.
*
- * @param conf
+ * @param job
*/
- protected void configureOutputFormat(Configuration conf) {}
+ protected void configureOutputFormat(Job job) {}
@Override
public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
@@ -288,20 +297,30 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
}
public void setStoreLocation(String location, Job job) throws IOException {
- conf = job.getConfiguration();
setLocationFromUri(location);
-
- // TODO If Pig ever uses a MultipleOutputs-esque construct, this approach will fall apart
- if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
- AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
- AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
- AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
- AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
- AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
+
+ // If Pig ever uses an approach like they handle inputs (load), this will fall apart.
+ // Currently, it appears that multiple stores will get new m/r jobs
+ if (job.getConfiguration().get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
+ try {
+ AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+
+ // AccumuloOutputFormat.setCreateTables(job, true);
+ // AccumuloOutputFormat.setDefaultTableName(job, table);
+ AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
+
+ BatchWriterConfig bwConfig = new BatchWriterConfig();
+ bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+ bwConfig.setMaxMemory(maxMutationBufferSize);
+ bwConfig.setMaxWriteThreads(maxWriteThreads);
+ AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
LOG.info("Writing data to " + table);
- configureOutputFormat(conf);
+ configureOutputFormat(job);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 8e9cfef..742480c 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -18,15 +18,14 @@ import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
-import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -142,8 +141,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
}
@Override
- protected void configureInputFormat(Configuration conf) {
- AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
+ protected void configureInputFormat(Job job) {
+ AccumuloInputFormat.addIterator(job, new IteratorSetting(50, WholeRowIterator.class));
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index a6db638..784904f 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -33,8 +33,8 @@ import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DefaultDataBag;
@@ -84,8 +84,8 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
}
@Override
- protected void configureInputFormat(Configuration conf) {
- AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
+ protected void configureInputFormat(Job job) {
+ AccumuloInputFormat.addIterator(job, new IteratorSetting(50, WholeRowIterator.class));
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 1b5b81a..21d4fc7 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -20,9 +20,13 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
@@ -33,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.data.Tuple;
+import org.junit.Assert;
import org.junit.Test;
public class AbstractAccumuloStorageTest {
@@ -42,12 +47,19 @@ public class AbstractAccumuloStorageTest {
Collection<Range> ranges = new LinkedList<Range>();
ranges.add(new Range(start, end));
- Job expected = new Job();
- Configuration expectedConf = expected.getConfiguration();
- AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
- AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.setRanges(expectedConf, ranges);
+ Job expected = new Job(new Configuration());
+
+ try {
+ AccumuloInputFormat.setConnectorInfo(expected, user, new PasswordToken(password));
+ } catch (AccumuloSecurityException e) {
+ Assert.fail(e.getMessage());
+ }
+ AccumuloInputFormat.setInputTableName(expected, table);
+ AccumuloInputFormat.setScanAuthorizations(expected, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(expected, inst, zookeepers);
+ AccumuloInputFormat.fetchColumns(expected, columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.setRanges(expected, ranges);
+
return expected;
}
@@ -72,13 +84,23 @@ public class AbstractAccumuloStorageTest {
public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
int maxWriteLatencyMS) throws IOException {
- Job expected = new Job();
- Configuration expectedConf = expected.getConfiguration();
- AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
- AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
- AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
- AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
- AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
+
+ Job expected = new Job(new Configuration());
+
+ try {
+ AccumuloOutputFormat.setConnectorInfo(expected, user, new PasswordToken(password));
+ } catch (AccumuloSecurityException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ AccumuloOutputFormat.setZooKeeperInstance(expected, inst, zookeepers);
+
+ BatchWriterConfig bwConfig = new BatchWriterConfig();
+ bwConfig.setMaxLatency(maxWriteLatencyMS, TimeUnit.MILLISECONDS);
+ bwConfig.setMaxMemory(maxWriteBufferSize);
+ bwConfig.setMaxWriteThreads(writeThreads);
+
+ AccumuloOutputFormat.setBatchWriterOptions(expected, bwConfig);
return expected;
}
[2/2] git commit: ACCUMULO-1783 Fix up 1.5 build
Posted by el...@apache.org.
ACCUMULO-1783 Fix up 1.5 build
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/cb720e85
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/cb720e85
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/cb720e85
Branch: refs/heads/ACCUMULO-1783-1.5
Commit: cb720e850f01668a857b0f22f15ca57f34ddffe9
Parents: 0ce6fb3
Author: Josh Elser <el...@apache.org>
Authored: Sat Nov 23 14:46:37 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Sat Nov 23 14:46:37 2013 -0500
----------------------------------------------------------------------
.../accumulo/pig/AbstractAccumuloStorage.java | 44 +++++++++++---------
.../pig/AbstractAccumuloStorageTest.java | 1 +
2 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/cb720e85/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 890abf3..801b31f 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -79,7 +79,9 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
private static final String COLON = ":", COMMA = ",";
+
private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName();
+ private static final String OUTPUT_PREFIX = AccumuloOutputFormat.class.getSimpleName();
private RecordReader<Key,Value> reader;
private RecordWriter<Text,Mutation> writer;
@@ -110,6 +112,10 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
protected Map<String,String> getInputFormatEntries(Configuration conf) {
return getEntries(conf, INPUT_PREFIX);
}
+
+ protected Map<String,String> getOutputFormatEntries(Configuration conf) {
+ return getEntries(conf, OUTPUT_PREFIX);
+ }
@Override
public Tuple getNext() throws IOException {
@@ -298,30 +304,30 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
public void setStoreLocation(String location, Job job) throws IOException {
setLocationFromUri(location);
+
+ Map<String,String> entries = getOutputFormatEntries(job.getConfiguration());
+ for (String key : entries.keySet()) {
+ job.getConfiguration().unset(key);
+ }
- // If Pig ever uses an approach like they handle inputs (load), this will fall apart.
- // Currently, it appears that multiple stores will get new m/r jobs
- if (job.getConfiguration().get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
- try {
- AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password));
- } catch (AccumuloSecurityException e) {
- throw new IOException(e);
- }
+ try {
+ AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
- // AccumuloOutputFormat.setCreateTables(job, true);
- // AccumuloOutputFormat.setDefaultTableName(job, table);
- AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
+ AccumuloOutputFormat.setCreateTables(job, true);
+ AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
- BatchWriterConfig bwConfig = new BatchWriterConfig();
- bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
- bwConfig.setMaxMemory(maxMutationBufferSize);
- bwConfig.setMaxWriteThreads(maxWriteThreads);
- AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+ BatchWriterConfig bwConfig = new BatchWriterConfig();
+ bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+ bwConfig.setMaxMemory(maxMutationBufferSize);
+ bwConfig.setMaxWriteThreads(maxWriteThreads);
+ AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
- LOG.info("Writing data to " + table);
+ LOG.info("Writing data to " + table);
- configureOutputFormat(job);
- }
+ configureOutputFormat(job);
}
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/cb720e85/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 21d4fc7..7302a82 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -94,6 +94,7 @@ public class AbstractAccumuloStorageTest {
}
AccumuloOutputFormat.setZooKeeperInstance(expected, inst, zookeepers);
+ AccumuloOutputFormat.setCreateTables(expected, true);
BatchWriterConfig bwConfig = new BatchWriterConfig();
bwConfig.setMaxLatency(maxWriteLatencyMS, TimeUnit.MILLISECONDS);