You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/19 01:53:17 UTC

[10/13] git commit: changed pom and Accumulo pig code to reflect changes in underlying Accumulo APIs

changed pom and Accumulo pig code to reflect changes in underlying Accumulo APIs


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/9d9c5fab
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/9d9c5fab
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/9d9c5fab

Branch: refs/heads/1.5
Commit: 9d9c5fab60cb0707fd0d6f316683b10978ce9e54
Parents: 4c6cb82
Author: Jason Trost <jt...@endgame.com>
Authored: Mon Mar 25 15:25:08 2013 -0400
Committer: Jason Trost <jt...@endgame.com>
Committed: Mon Mar 25 15:25:08 2013 -0400

----------------------------------------------------------------------
 pom.xml                                         | 48 ++++++++++--
 .../accumulo/pig/AbstractAccumuloStorage.java   | 81 ++++++++++++--------
 .../apache/accumulo/pig/AccumuloStorage.java    | 54 +++++++------
 .../java/org/apache/accumulo/pig/Utils.java     |  5 ++
 4 files changed, 127 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9d9c5fab/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6c7bbd3..0b36bbf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
   <groupId>org.apache.accumulo</groupId>
   <artifactId>accumulo-pig</artifactId>
   <packaging>jar</packaging>
-  <version>1.4.0</version>
+  <version>1.5.0-SNAPSHOT</version>
   <name>accumulo-pig</name>
   <url>http://maven.apache.org</url>
 
@@ -36,35 +36,44 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
-      <version>0.20.2</version>
+      <version>1.1.1</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.pig</groupId>
       <artifactId>pig</artifactId>
-      <version>0.9.2</version>
+      <version>0.11.0</version>
+    </dependency>
+
+    <!-- Needed by Apache Pig 0.11.0 -->
+    <dependency>
+        <groupId>joda-time</groupId>
+        <artifactId>joda-time</artifactId>
+        <version>1.6</version>
     </dependency>
 
 	<dependency>
         <groupId>org.apache.accumulo</groupId>
         <artifactId>accumulo-core</artifactId>
-    <version>1.4.0</version>
+    <version>1.5.0-SNAPSHOT</version>
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>zookeeper</artifactId>
-    <version>3.3.1</version>
+        <version>3.3.1</version>
       </dependency>
       <dependency>
         <groupId>org.apache.thrift</groupId>
         <artifactId>libthrift</artifactId>
-    <version>0.6.1</version>
+        <version>0.9.0</version>
       </dependency>
+<!--
       <dependency>
         <groupId>org.apache.accumulo</groupId>
         <artifactId>cloudtrace</artifactId>
     <version>1.4.0</version>
-      </dependency>
+      </dependency> 
+-->
 
 
   </dependencies>
@@ -79,6 +88,31 @@
       <target>1.6</target>
    </configuration>
 </plugin>
+
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+          <archive>
+            <manifest>
+              <mainClass/>
+            </manifest>
+          </archive>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+
  </plugins>
 </build>
 

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9d9c5fab/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 9c5ed75..82329d3 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -21,9 +21,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -204,30 +209,32 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
         conf = job.getConfiguration();
         setLocationFromUri(location);
         
-        if(!conf.getBoolean(AccumuloInputFormat.class.getSimpleName()+".configured", false))
+        if(!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf))
         {
-        	AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
-            AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+        	AccumuloInputFormat.setInputTableName(job, table);
+        	AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+            AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
+            
+        	try {
+				AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+			} catch (AccumuloSecurityException e) {
+				throw new IOException(e);
+			}
+        	
             if(columnFamilyColumnQualifierPairs.size() > 0)
             {
             	LOG.info("columns: "+columnFamilyColumnQualifierPairs);
-            	AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
+            	AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
             }
             
-            AccumuloInputFormat.setRanges(conf, Collections.singleton(new Range(start, end)));
+            AccumuloInputFormat.setRanges(job, Collections.singleton(new Range(start, end)));
             configureInputFormat(conf);
         }
     }
 
-    protected void configureInputFormat(Configuration conf)
-    {
-    	
-    }
+    protected void configureInputFormat(Configuration conf){}
     
-    protected void configureOutputFormat(Configuration conf)
-    {
-    	
-    }
+    protected void configureOutputFormat(Configuration conf){}
     
     @Override
     public String relativeToAbsolutePath(String location, Path curDir) throws IOException
@@ -236,16 +243,10 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     }
 
     @Override
-    public void setUDFContextSignature(String signature)
-    {
-        
-    }
+    public void setUDFContextSignature(String signature){}
 
     /* StoreFunc methods */
-    public void setStoreFuncUDFContextSignature(String signature)
-    {
-        
-    }
+    public void setStoreFuncUDFContextSignature(String signature){}
 
     public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
     {
@@ -256,15 +257,32 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     {
         conf = job.getConfiguration();
         setLocationFromUri(location);
-        
-        if(!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName()+".configured", false))
-        {
-        	AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
-            AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-            AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
-            AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
-            AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
-            configureOutputFormat(conf);
+
+        try
+        { 
+           if(!ConfiguratorBase.isConnectorInfoSet(AccumuloOutputFormat.class, conf))
+           {
+        	   BatchWriterConfig bwConfig = new BatchWriterConfig();
+        	   bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+        	   bwConfig.setMaxMemory(maxMutationBufferSize);
+        	   bwConfig.setMaxWriteThreads(maxWriteThreads);
+        	   AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+        	   
+        	   AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
+        	   AccumuloOutputFormat.setDefaultTableName(job, table);
+        	   AccumuloOutputFormat.setCreateTables(job, true);
+        	   
+				try {
+					AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+				} catch (AccumuloSecurityException e) {
+					throw new IOException(e);
+				}
+
+                configureOutputFormat(conf);
+           }
+        }
+        catch(java.lang.IllegalStateException e1){
+            e1.printStackTrace();
         }
     }
 
@@ -299,4 +317,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     }
 
     public void cleanupOnFailure(String failure, Job job){}
+    
+	@Override
+	public void cleanupOnSuccess(String location, Job job) throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9d9c5fab/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 0803aa6..5b146d6 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -64,29 +64,35 @@ public class AccumuloStorage extends AbstractAccumuloStorage
 	
 	@Override
 	public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
-		Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
-        Text cf = Utils.objToText(tuple.get(1));
-    	Text cq = Utils.objToText(tuple.get(2));
-    	
-        if(tuple.size() > 4)
-        {
-        	Text cv = Utils.objToText(tuple.get(3));
-        	Value val = new Value(Utils.objToBytes(tuple.get(4)));
-        	if(cv.getLength() == 0)
-        	{
-        		mut.put(cf, cq, val);
-        	}
-        	else
-        	{
-        		mut.put(cf, cq, new ColumnVisibility(cv), val);
-        	}
-        }
-        else
-        {
-        	Value val = new Value(Utils.objToBytes(tuple.get(3)));
-        	mut.put(cf, cq, val);
-        }
-        
-        return Collections.singleton(mut);
+		
+		try {
+			Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
+			Text cf = Utils.objToText(tuple.get(1));
+			Text cq = Utils.objToText(tuple.get(2));
+			
+			if(tuple.size() > 4)
+			{
+				Text cv = Utils.objToText(tuple.get(3));
+				Value val = new Value(Utils.objToBytes(tuple.get(4)));
+				if(cv.getLength() == 0)
+				{
+					mut.put(cf, cq, val);
+				}
+				else
+				{
+					mut.put(cf, cq, new ColumnVisibility(cv), val);
+				}
+			}
+			else
+			{
+				Value val = new Value(Utils.objToBytes(tuple.get(3)));
+				mut.put(cf, cq, val);
+			}
+			
+			return Collections.singleton(mut);
+		} catch (IOException e) {
+		System.err.println("Error on Tuple: "+tuple);
+		throw e;
+	}
 	}
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9d9c5fab/src/main/java/org/apache/accumulo/pig/Utils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/Utils.java b/src/main/java/org/apache/accumulo/pig/Utils.java
index e43c078..6b7fdf9 100644
--- a/src/main/java/org/apache/accumulo/pig/Utils.java
+++ b/src/main/java/org/apache/accumulo/pig/Utils.java
@@ -27,6 +27,11 @@ public class Utils {
     
     public static byte[] objToBytes(Object o)
     {
+    	if( o == null)
+    	{
+    		return new byte[0];
+    	}
+    	
     	if (o instanceof String) {
 			String str = (String) o;
 			return str.getBytes();