You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/19 01:53:17 UTC
[10/13] git commit: changed pom and Accumulo pig code to reflect
changes in underlying Accumulo APIs
changed pom and Accumulo pig code to reflect changes in underlying Accumulo APIs
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/9d9c5fab
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/9d9c5fab
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/9d9c5fab
Branch: refs/heads/1.5
Commit: 9d9c5fab60cb0707fd0d6f316683b10978ce9e54
Parents: 4c6cb82
Author: Jason Trost <jt...@endgame.com>
Authored: Mon Mar 25 15:25:08 2013 -0400
Committer: Jason Trost <jt...@endgame.com>
Committed: Mon Mar 25 15:25:08 2013 -0400
----------------------------------------------------------------------
pom.xml | 48 ++++++++++--
.../accumulo/pig/AbstractAccumuloStorage.java | 81 ++++++++++++--------
.../apache/accumulo/pig/AccumuloStorage.java | 54 +++++++------
.../java/org/apache/accumulo/pig/Utils.java | 5 ++
4 files changed, 127 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9d9c5fab/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6c7bbd3..0b36bbf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-pig</artifactId>
<packaging>jar</packaging>
- <version>1.4.0</version>
+ <version>1.5.0-SNAPSHOT</version>
<name>accumulo-pig</name>
<url>http://maven.apache.org</url>
@@ -36,35 +36,44 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
+ <version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
- <version>0.9.2</version>
+ <version>0.11.0</version>
+ </dependency>
+
+ <!-- Needed by Apache Pig 0.11.0 -->
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>1.6</version>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
- <version>1.4.0</version>
+ <version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>zookeeper</artifactId>
- <version>3.3.1</version>
+ <version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
- <version>0.6.1</version>
+ <version>0.9.0</version>
</dependency>
+<!--
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>cloudtrace</artifactId>
<version>1.4.0</version>
- </dependency>
+ </dependency>
+-->
</dependencies>
@@ -79,6 +88,31 @@
<target>1.6</target>
</configuration>
</plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass/>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9d9c5fab/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 9c5ed75..82329d3 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -21,9 +21,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
@@ -204,30 +209,32 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
conf = job.getConfiguration();
setLocationFromUri(location);
- if(!conf.getBoolean(AccumuloInputFormat.class.getSimpleName()+".configured", false))
+ if(!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf))
{
- AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+ AccumuloInputFormat.setInputTableName(job, table);
+ AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
+
+ try {
+ AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+
if(columnFamilyColumnQualifierPairs.size() > 0)
{
LOG.info("columns: "+columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
}
- AccumuloInputFormat.setRanges(conf, Collections.singleton(new Range(start, end)));
+ AccumuloInputFormat.setRanges(job, Collections.singleton(new Range(start, end)));
configureInputFormat(conf);
}
}
- protected void configureInputFormat(Configuration conf)
- {
-
- }
+ protected void configureInputFormat(Configuration conf){}
- protected void configureOutputFormat(Configuration conf)
- {
-
- }
+ protected void configureOutputFormat(Configuration conf){}
@Override
public String relativeToAbsolutePath(String location, Path curDir) throws IOException
@@ -236,16 +243,10 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
}
@Override
- public void setUDFContextSignature(String signature)
- {
-
- }
+ public void setUDFContextSignature(String signature){}
/* StoreFunc methods */
- public void setStoreFuncUDFContextSignature(String signature)
- {
-
- }
+ public void setStoreFuncUDFContextSignature(String signature){}
public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
{
@@ -256,15 +257,32 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
{
conf = job.getConfiguration();
setLocationFromUri(location);
-
- if(!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName()+".configured", false))
- {
- AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
- AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
- AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
- AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
- AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
- configureOutputFormat(conf);
+
+ try
+ {
+ if(!ConfiguratorBase.isConnectorInfoSet(AccumuloOutputFormat.class, conf))
+ {
+ BatchWriterConfig bwConfig = new BatchWriterConfig();
+ bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+ bwConfig.setMaxMemory(maxMutationBufferSize);
+ bwConfig.setMaxWriteThreads(maxWriteThreads);
+ AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+
+ AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
+ AccumuloOutputFormat.setDefaultTableName(job, table);
+ AccumuloOutputFormat.setCreateTables(job, true);
+
+ try {
+ AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+
+ configureOutputFormat(conf);
+ }
+ }
+ catch(java.lang.IllegalStateException e1){
+ e1.printStackTrace();
}
}
@@ -299,4 +317,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
}
public void cleanupOnFailure(String failure, Job job){}
+
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {}
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9d9c5fab/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 0803aa6..5b146d6 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -64,29 +64,35 @@ public class AccumuloStorage extends AbstractAccumuloStorage
@Override
public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
- Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
- Text cf = Utils.objToText(tuple.get(1));
- Text cq = Utils.objToText(tuple.get(2));
-
- if(tuple.size() > 4)
- {
- Text cv = Utils.objToText(tuple.get(3));
- Value val = new Value(Utils.objToBytes(tuple.get(4)));
- if(cv.getLength() == 0)
- {
- mut.put(cf, cq, val);
- }
- else
- {
- mut.put(cf, cq, new ColumnVisibility(cv), val);
- }
- }
- else
- {
- Value val = new Value(Utils.objToBytes(tuple.get(3)));
- mut.put(cf, cq, val);
- }
-
- return Collections.singleton(mut);
+
+ try {
+ Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
+ Text cf = Utils.objToText(tuple.get(1));
+ Text cq = Utils.objToText(tuple.get(2));
+
+ if(tuple.size() > 4)
+ {
+ Text cv = Utils.objToText(tuple.get(3));
+ Value val = new Value(Utils.objToBytes(tuple.get(4)));
+ if(cv.getLength() == 0)
+ {
+ mut.put(cf, cq, val);
+ }
+ else
+ {
+ mut.put(cf, cq, new ColumnVisibility(cv), val);
+ }
+ }
+ else
+ {
+ Value val = new Value(Utils.objToBytes(tuple.get(3)));
+ mut.put(cf, cq, val);
+ }
+
+ return Collections.singleton(mut);
+ } catch (IOException e) {
+ System.err.println("Error on Tuple: "+tuple);
+ throw e;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9d9c5fab/src/main/java/org/apache/accumulo/pig/Utils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/Utils.java b/src/main/java/org/apache/accumulo/pig/Utils.java
index e43c078..6b7fdf9 100644
--- a/src/main/java/org/apache/accumulo/pig/Utils.java
+++ b/src/main/java/org/apache/accumulo/pig/Utils.java
@@ -27,6 +27,11 @@ public class Utils {
public static byte[] objToBytes(Object o)
{
+ if( o == null)
+ {
+ return new byte[0];
+ }
+
if (o instanceof String) {
String str = (String) o;
return str.getBytes();