You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/07 06:24:03 UTC

[1/5] git commit: ACCUMULO-1783 Add in maven-compiler-plugin configuration to make eclipse stop defaulting to 1.5

Updated Branches:
  refs/heads/ACCUMULO-1783 30fd9aa6c -> 22498f775


ACCUMULO-1783 Add in maven-compiler-plugin configuration to make eclipse
stop defaulting to 1.5


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/dd212693
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/dd212693
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/dd212693

Branch: refs/heads/ACCUMULO-1783
Commit: dd212693c8ed9f8d17ef6639f0e65d6d96ffb7d0
Parents: 30fd9aa
Author: Josh Elser <el...@apache.org>
Authored: Sun Nov 3 22:20:20 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Sun Nov 3 22:20:20 2013 -0500

----------------------------------------------------------------------
 pom.xml | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/dd212693/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b096123..963dab6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,6 +20,19 @@
   <artifactId>accumulo-pig</artifactId>
   <version>1.4.4-SNAPSHOT</version>
   
+  <build>
+  	<plugins>
+  		<plugin>
+  			<artifactId>maven-compiler-plugin</artifactId>
+  			<version>3.1</version>
+  			<configuration>
+  				<source>1.6</source>
+  				<target>1.6</target>
+  			</configuration>
+  		</plugin>
+  	</plugins>
+  </build>
+  
   <dependencies>
     <dependency>
       <groupId>org.apache.pig</groupId>


[5/5] git commit: ACCUMULO-1783 Rework a bit of the storage set(Store)Locations code to account for the upstream changes in the input format code.

Posted by el...@apache.org.
ACCUMULO-1783 Rework a bit of the storage set(Store)Locations code to
account for the upstream changes in the input format code.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/22498f77
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/22498f77
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/22498f77

Branch: refs/heads/ACCUMULO-1783
Commit: 22498f775db53351d61996f94bfd027a8dbbdf0b
Parents: 904604d
Author: Josh Elser <el...@apache.org>
Authored: Thu Nov 7 00:22:09 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Nov 7 00:22:09 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 304 ++++++++++++++++---
 .../pig/AbstractAccumuloStorageTest.java        |  25 +-
 .../accumulo/pig/AccumuloPigClusterTest.java    |   4 -
 .../pig/AccumuloWholeRowStorageTest.java        |  11 +-
 4 files changed, 279 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 9473753..1d53371 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -16,24 +16,31 @@
  */
 package org.apache.accumulo.pig;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.SequencedFormatHelper;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,8 +55,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadStoreCaster;
 import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataBag;
@@ -72,7 +79,8 @@ import org.joda.time.DateTime;
 public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
   private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
   
-  private static final String COLON = ":", COMMA = ",";
+  private static final String COLON = ":", COMMA = ",", PERIOD = ".";
+  private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(), OUTPUT_PREFIX = AccumuloOutputFormat.class.getSimpleName();
   
   private Configuration conf;
   private RecordReader<Key,Value> reader;
@@ -204,36 +212,40 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    int sequence = AccumuloInputFormat.nextSequence();
-    
-    // TODO Something more.. "certain".
-    if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
-      LOG.warn("InputFormat already configured for " + sequence);
-      return;
-      // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
-    }
-    
-    AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
-    AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
-    if (columnFamilyColumnQualifierPairs.size() > 0) {
-      LOG.info("columns: " + columnFamilyColumnQualifierPairs);
-      AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
+    Map<String,String> entries = AccumuloInputFormat.getRelevantEntries(conf);
+
+    if (shouldSetInput(entries)) {
+      int sequence = AccumuloInputFormat.nextSequence(conf);
+      
+      // TODO Something more.. "certain".
+      if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
+        LOG.warn("InputFormat already configured for " + sequence);
+        return;
+        // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
+      }
+      
+      AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
+      AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+      if (columnFamilyColumnQualifierPairs.size() > 0) {
+        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+        AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
+      }
+      
+      Collection<Range> ranges = Collections.singleton(new Range(start, end));
+      
+      LOG.info("Scanning Accumulo for " + ranges);
+      
+      AccumuloInputFormat.setRanges(conf, sequence, ranges);
+      
+      configureInputFormat(conf, sequence);
     }
-    
-    Collection<Range> ranges = Collections.singleton(new Range(start, end));
-    
-    LOG.info("Scanning Accumulo for " + ranges);
-    
-    AccumuloInputFormat.setRanges(conf, sequence, ranges);
-    
-    configureInputFormat(conf, sequence);
   }
   
   protected void configureInputFormat(Configuration conf, int sequence) {
     
   }
   
-  protected void configureOutputFormat(Configuration conf) {
+  protected void configureOutputFormat(Configuration conf, int sequence) {
     
   }
   
@@ -268,22 +280,240 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    int sequence = AccumuloOutputFormat.nextSequence();
+    Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
     
-    // TODO Something more.. "certain".
-    if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
-      LOG.warn("OutputFormat already configured for " + sequence);
-      return;
-      // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
+    if (shouldSetOutput(entries)) {
+      int sequence = AccumuloOutputFormat.nextSequence(conf);
+      
+      // TODO Something more.. "certain".
+      if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
+        LOG.warn("OutputFormat already configured for " + sequence);
+        return;
+        // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
+      }
+      
+      AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true, table);
+      AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+      AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
+      AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
+      AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
+      
+      configureOutputFormat(conf, sequence);
+    } else {
+      LOG.debug("Not setting configuration as it appears that it has already been set");
     }
+  }
+  
+  private boolean shouldSetInput(Map<String,String> configEntries) {
+    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
     
-    AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true, table);
-    AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
-    AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
-    AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
-    AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
+    for (Map<String,String> group : groupedConfigEntries.values()) {
+      if (null != inst) {
+        if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
+        continue;
+      }
+      
+      if (null != zookeepers) {
+        if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
+        continue;
+      }
+      
+      if (null != user) {
+        if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".username")) {
+        continue;
+      }
+      
+      if (null != password) {
+        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".password")) {
+        continue;
+      }
+      
+      if (null != table) {
+        if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
+        continue;
+      }
+      
+      if (null != authorizations) {
+        if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
+        continue;
+      }
+      
+      String columnValues = group.get(INPUT_PREFIX + ".columns");
+      if (null != columnFamilyColumnQualifierPairs) {
+        StringBuilder expected = new StringBuilder(128);
+        for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+          if (0 < expected.length()) {
+            expected.append(COMMA);
+          }
+          
+          expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
+          if (column.getSecond() != null)
+            expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
+        }
+        
+        if (!expected.toString().equals(columnValues)) {
+          continue;
+        }
+      } else if (null != columnValues){ 
+        continue;
+      }
+      
+      Range expected = new Range(start, end);
+      String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
+      if (null != serializedRanges) {
+        try {
+          // We currently only support serializing one Range into the Configuration from this Storage class
+          ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
+          Range range = new Range();
+          range.readFields(new DataInputStream(bais));
+          
+          if (!expected.equals(range)) {
+            continue;
+          }
+        } catch (IOException e) {
+          // Got an exception, they don't match
+          continue;
+        }
+      }
+      
+      
+      // We found a group of entries in the config which are (similar to) what
+      // we would have set.
+      return false;
+    }
+    
+    // We didn't find any entries that seemed to match, write the config
+    return true;
+  }
+  
+  private boolean shouldSetOutput(Map<String,String> configEntries) {
+    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
+    
+    for (Map<String,String> group : groupedConfigEntries.values()) {
+      if (null != inst) {
+        if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
+          continue;
+        }
+      } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
+        continue;
+      }
+      
+      if (null != zookeepers) {
+        if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
+          continue;
+        }
+      } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
+        continue;
+      }
+      
+      if (null != user) {
+        if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
+          continue;
+        }
+      } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
+        continue;
+      }
+      
+      if (null != password) {
+        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) {
+          continue;
+        }
+      } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
+        continue;
+      }
+      
+      if (null != table) {
+        if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
+          continue;
+        }
+      } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
+        continue;
+      }
+      
+      String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
+      try {
+        if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) {
+            continue;
+          }
+      } catch (NumberFormatException e) {
+        // Wasn't a number, so it can't match what we were expecting
+        continue;
+      }
+      
+      String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
+      try {
+        if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) {
+            continue;
+          }
+      } catch (NumberFormatException e) {
+        // Wasn't a number, so it can't match what we were expecting
+        continue;
+      }
+      
+      String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
+      try {
+        if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
+            continue;
+          }
+      } catch (NumberFormatException e) {
+        // Wasn't a number, so it can't match what we were expecting
+        continue;
+      }
+      
+      // We found a group of entries in the config which are (similar to) what
+      // we would have set.
+      return false;
+    }
+    
+    // We didn't find any entries that seemed to match, write the config
+    return true;
+  }
+  
+  private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) {
+    Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
+    for (Entry<String,String> entry : entries.entrySet()) {
+      final String key = entry.getKey(), value = entry.getValue();
+      
+      if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
+        continue;
+      }
+      
+      int index = key.lastIndexOf(PERIOD);
+      if (-1 != index) {
+        int group = Integer.parseInt(key.substring(index + 1));
+        String name = key.substring(0, index);
+        
+        Map<String,String> entriesInGroup = groupedEntries.get(group);
+        if (null == entriesInGroup) {
+          entriesInGroup = new HashMap<String,String>();
+          groupedEntries.put(group, entriesInGroup);
+        }
+        
+        entriesInGroup.put(name, value);
+      } else {
+        LOG.warn("Could not parse key: " + key);
+      }
+    }
     
-    configureOutputFormat(conf);
+    return groupedEntries;
   }
   
   @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index e9f0297..bc886ec 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -33,24 +33,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.data.Tuple;
-import org.junit.Before;
 import org.junit.Test;
 
 public class AbstractAccumuloStorageTest {
   
-  @Before
-  public void setup() {
-    AccumuloInputFormat.resetCounters();
-    AccumuloOutputFormat.resetCounters();
-  }
-  
-  public Job getExpectedLoadJob(int sequence, String inst, String zookeepers, String user, String password, String table, String start, String end,
+  public Job getExpectedLoadJob(String inst, String zookeepers, String user, String password, String table, String start, String end,
       Authorizations authorizations, List<Pair<Text,Text>> columnFamilyColumnQualifierPairs) throws IOException {
     Collection<Range> ranges = new LinkedList<Range>();
     ranges.add(new Range(start, end));
     
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
+    final int sequence = AccumuloInputFormat.nextSequence(expectedConf);
     AccumuloInputFormat.setInputInfo(expectedConf, sequence, user, password.getBytes(), table, authorizations);
     AccumuloInputFormat.setZooKeeperInstance(expectedConf, sequence, inst, zookeepers);
     AccumuloInputFormat.fetchColumns(expectedConf, sequence, columnFamilyColumnQualifierPairs);
@@ -58,7 +52,7 @@ public class AbstractAccumuloStorageTest {
     return expected;
   }
   
-  public Job getDefaultExpectedLoadJob(int sequence) throws IOException {
+  public Job getDefaultExpectedLoadJob() throws IOException {
     String inst = "myinstance";
     String zookeepers = "127.0.0.1:2181";
     String user = "root";
@@ -73,14 +67,15 @@ public class AbstractAccumuloStorageTest {
     columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col2"), new Text("cq2")));
     columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col3"), null));
     
-    Job expected = getExpectedLoadJob(sequence, inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
+    Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
     return expected;
   }
   
-  public Job getExpectedStoreJob(int sequence, String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
+  public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
       int maxWriteLatencyMS) throws IOException {
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
+    final int sequence = AccumuloOutputFormat.nextSequence(expectedConf);
     AccumuloOutputFormat.setOutputInfo(expectedConf, sequence,user, password.getBytes(), true, table);
     AccumuloOutputFormat.setZooKeeperInstance(expectedConf, sequence,inst, zookeepers);
     AccumuloOutputFormat.setMaxLatency(expectedConf, sequence,maxWriteLatencyMS);
@@ -90,7 +85,7 @@ public class AbstractAccumuloStorageTest {
     return expected;
   }
   
-  public Job getDefaultExpectedStoreJob(int sequence) throws IOException {
+  public Job getDefaultExpectedStoreJob() throws IOException {
     String inst = "myinstance";
     String zookeepers = "127.0.0.1:2181";
     String user = "root";
@@ -100,7 +95,7 @@ public class AbstractAccumuloStorageTest {
     int writeThreads = 7;
     int maxWriteLatencyMS = 30000;
     
-    Job expected = getExpectedStoreJob(sequence, inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
+    Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
     return expected;
   }
   
@@ -136,7 +131,7 @@ public class AbstractAccumuloStorageTest {
     s.setLocation(getDefaultLoadLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    Job expected = getDefaultExpectedLoadJob(1);
+    Job expected = getDefaultExpectedLoadJob();
     Configuration expectedConf = expected.getConfiguration();
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
@@ -150,7 +145,7 @@ public class AbstractAccumuloStorageTest {
     s.setStoreLocation(getDefaultStoreLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    Job expected = getDefaultExpectedStoreJob(1);
+    Job expected = getDefaultExpectedStoreJob();
     Configuration expectedConf = expected.getConfiguration();
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
index 0e2abb5..10b0a2a 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
@@ -10,8 +10,6 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
@@ -54,8 +52,6 @@ public class AccumuloPigClusterTest {
   
   @Before
   public void beforeTest() throws Exception {
-    AccumuloInputFormat.resetCounters();
-    AccumuloOutputFormat.resetCounters();
     pig = new PigServer(ExecType.LOCAL, conf);
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index 3a0ab85..b57c7ba 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -30,7 +30,6 @@ import java.util.List;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -43,16 +42,9 @@ import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DefaultDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.junit.Before;
 import org.junit.Test;
 
 public class AccumuloWholeRowStorageTest {
-
-  @Before
-  public void setup() {
-    AccumuloInputFormat.resetCounters();
-    AccumuloOutputFormat.resetCounters();
-  }
   
   @Test
   public void testConfiguration() throws IOException {
@@ -64,9 +56,10 @@ public class AccumuloWholeRowStorageTest {
     s.setLocation(test.getDefaultLoadLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
+    // A little brittle. We know this is the sequence number used when we create the DefaultExpectedLoadJob()
     final int sequence = 1;
     
-    Job expected = test.getDefaultExpectedLoadJob(sequence);
+    Job expected = test.getDefaultExpectedLoadJob();
     Configuration expectedConf = expected.getConfiguration();
     AccumuloInputFormat.addIterator(expectedConf, sequence, new IteratorSetting(50, WholeRowIterator.class));
     


[3/5] git commit: ACCUMULO-1783 Lift some pig test classes to write a better "functional" test that ensures that joins actually work.

Posted by el...@apache.org.
ACCUMULO-1783 Lift some pig test classes to write a better "functional"
test that ensures that joins actually work.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/9b398d4a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/9b398d4a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/9b398d4a

Branch: refs/heads/ACCUMULO-1783
Commit: 9b398d4a32e50d3503b4ecb2f86a306e9db0221b
Parents: d72e1cb
Author: Josh Elser <el...@apache.org>
Authored: Tue Nov 5 17:14:33 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Nov 5 17:14:33 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AccumuloPigClusterTest.java    | 165 +++++++++++++++++++
 .../java/org/apache/pig/test/MiniCluster.java   |  86 ++++++++++
 .../org/apache/pig/test/MiniGenericCluster.java | 123 ++++++++++++++
 3 files changed, 374 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9b398d4a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
new file mode 100644
index 0000000..0e2abb5
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
@@ -0,0 +1,165 @@
+package org.apache.accumulo.pig;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class AccumuloPigClusterTest {
+  
+  private static final File tmpdir = Files.createTempDir();
+  private static MiniAccumuloCluster accumuloCluster;
+  private static MiniCluster cluster;
+  private static Configuration conf;
+  private PigServer pig;
+  
+  @BeforeClass
+  public static void setupClusters() throws Exception {
+    MiniAccumuloConfig macConf = new MiniAccumuloConfig(tmpdir, "password");
+    macConf.setNumTservers(1);
+    
+    accumuloCluster = new MiniAccumuloCluster(macConf);
+    accumuloCluster.start();
+    
+    // This is needed by Pig
+    cluster = MiniCluster.buildCluster();
+    conf = cluster.getConfiguration();
+  }
+  
+  @Before
+  public void beforeTest() throws Exception {
+    AccumuloInputFormat.resetCounters();
+    AccumuloOutputFormat.resetCounters();
+    pig = new PigServer(ExecType.LOCAL, conf);
+  }
+  
+  @AfterClass
+  public static void stopClusters() throws Exception {
+    accumuloCluster.stop();
+    FileUtils.deleteDirectory(tmpdir);
+  }
+  
+  private void loadTestData() throws Exception {
+    ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCluster.getInstanceName(), accumuloCluster.getZooKeepers());
+    Connector c = inst.getConnector("root", "password");
+    
+    TableOperations tops = c.tableOperations();
+    if (!tops.exists("airports")) {
+      tops.create("airports");
+    }
+    
+    if (!tops.exists("flights")) {
+      tops.create("flights");
+    }
+    
+    @SuppressWarnings("unchecked")
+    final List<ImmutableMap<String,String>> airportData = Lists.newArrayList(ImmutableMap.of("code", "SJC", "name", "San Jose"),
+        ImmutableMap.of("code", "SFO", "name", "San Francisco"), ImmutableMap.of("code", "MDO", "name", "Orlando"),
+        ImmutableMap.of("code", "MDW", "name", "Chicago-Midway"), ImmutableMap.of("code", "JFK", "name", "JFK International"),
+        ImmutableMap.of("code", "BWI", "name", "Baltimore-Washington"));
+    
+    BatchWriter bw = c.createBatchWriter("airports", 100000l, 1000l, 1);
+    try {
+      int i = 1;
+      for (Map<String,String> record : airportData) {
+        Mutation m = new Mutation(Integer.toString(i));
+        
+        for (Entry<String,String> entry : record.entrySet()) {
+          m.put(entry.getKey(), "", entry.getValue());
+        }
+        
+        bw.addMutation(m);
+        i++;
+      }
+    } finally {
+      if (null != bw) {
+        bw.close();
+      }
+    }
+    
+    @SuppressWarnings("unchecked")
+    final List<ImmutableMap<String,String>> flightData = Lists.newArrayList(ImmutableMap.of("origin", "BWI", "destination", "SFO"),
+        ImmutableMap.of("origin", "BWI", "destination", "SJC"), ImmutableMap.of("origin", "MDW", "destination", "MDO"),
+        ImmutableMap.of("origin", "MDO", "destination", "SJC"), ImmutableMap.of("origin", "SJC", "destination", "JFK"),
+        ImmutableMap.of("origin", "JFK", "destination", "MDW"));
+    
+    bw = c.createBatchWriter("flights", 100000l, 1000l, 1);
+    try {
+      int i = 1;
+      for (Map<String,String> record : flightData) {
+        Mutation m = new Mutation(Integer.toString(i));
+        
+        for (Entry<String,String> entry : record.entrySet()) {
+          m.put(entry.getKey(), "", entry.getValue());
+        }
+        
+        bw.addMutation(m);
+        i++;
+      }
+    } finally {
+      if (null != bw) {
+        bw.close();
+      }
+    }
+  }
+  
+  @Test
+  public void test() throws Exception {
+    loadTestData();
+    
+    final String loadFlights = "flights = LOAD 'accumulo://flights?instance=" + accumuloCluster.getInstanceName() + 
+        "&user=root&password=password&zookeepers=" + accumuloCluster.getZooKeepers() + "' using org.apache.accumulo.pig.AccumuloStorage()" + 
+        " as (rowKey:chararray, column_map:map[]);";
+    
+    final String loadAirports = "airports = LOAD 'accumulo://airports?instance=" + accumuloCluster.getInstanceName() + 
+        "&user=root&password=password&zookeepers=" + accumuloCluster.getZooKeepers() + "' using org.apache.accumulo.pig.AccumuloStorage()" + 
+        " as (rowKey:chararray, column_map:map[]);";
+    
+    final String joinQuery = "joined = JOIN flights BY column_map#'origin', airports BY column_map#'code';";
+    
+    // System.out.println(query);
+    
+    pig.registerQuery(loadFlights);
+    pig.registerQuery(loadAirports);
+    pig.registerQuery(joinQuery);
+    
+    Iterator<Tuple> it = pig.openIterator("joined");
+    
+    int i = 0;
+    while (it.hasNext()) {
+      Tuple t = it.next();
+      System.out.println(t);
+      i++;
+    }
+    
+    // TODO actually verify something here
+    Assert.assertTrue("Should have found records but found none", i > 0);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9b398d4a/src/test/java/org/apache/pig/test/MiniCluster.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pig/test/MiniCluster.java b/src/test/java/org/apache/pig/test/MiniCluster.java
new file mode 100644
index 0000000..64467ae
--- /dev/null
+++ b/src/test/java/org/apache/pig/test/MiniCluster.java
@@ -0,0 +1,86 @@
+package org.apache.pig.test;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+
+public class MiniCluster extends MiniGenericCluster {
+    private static final File CONF_DIR = new File("build/classes");
+    private static final File CONF_FILE = new File(CONF_DIR, "hadoop-site.xml");
+    
+    private MiniMRCluster m_mr = null;
+    public MiniCluster() {
+        super();
+    }
+
+    @Override
+    protected void setupMiniDfsAndMrClusters() {
+        try {
+            System.setProperty("hadoop.log.dir", "build/test/logs");
+            final int dataNodes = 4;     // There will be 4 data nodes
+            final int taskTrackers = 4;  // There will be 4 task tracker nodes
+
+            // Create the dir that holds hadoop-site.xml file
+            // Delete if hadoop-site.xml exists already
+            CONF_DIR.mkdirs();
+            if(CONF_FILE.exists()) {
+                CONF_FILE.delete();
+            }
+
+            // Builds and starts the mini dfs and mapreduce clusters
+            Configuration config = new Configuration();
+            m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
+            m_fileSys = m_dfs.getFileSystem();
+            m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
+
+            // Write the necessary config info to hadoop-site.xml
+            m_conf = m_mr.createJobConf();
+            m_conf.setInt("mapred.submit.replication", 2);
+            m_conf.set("dfs.datanode.address", "0.0.0.0:0");
+            m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+            m_conf.set("mapred.map.max.attempts", "2");
+            m_conf.set("mapred.reduce.max.attempts", "2");
+            m_conf.set("pig.jobcontrol.sleep", "100");
+            m_conf.writeXml(new FileOutputStream(CONF_FILE));
+
+            // Set the system properties needed by Pig
+            System.setProperty("cluster", m_conf.get("mapred.job.tracker"));
+            System.setProperty("namenode", m_conf.get("fs.default.name"));
+            System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void shutdownMiniMrClusters() {
+        // Delete hadoop-site.xml on shutDown
+        if(CONF_FILE.exists()) {
+            CONF_FILE.delete();
+        }
+        if (m_mr != null) { m_mr.shutdown(); }
+            m_mr = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9b398d4a/src/test/java/org/apache/pig/test/MiniGenericCluster.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pig/test/MiniGenericCluster.java b/src/test/java/org/apache/pig/test/MiniGenericCluster.java
new file mode 100644
index 0000000..584631a
--- /dev/null
+++ b/src/test/java/org/apache/pig/test/MiniGenericCluster.java
@@ -0,0 +1,123 @@
+package org.apache.pig.test;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import java.io.*;
+import java.util.Properties;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+
+/**
+ * This class builds a single instance of itself with the Singleton 
+ * design pattern. While building the single instance, it sets up a 
+ * mini cluster that actually consists of a mini DFS cluster and a 
+ * mini MapReduce cluster on the local machine and also sets up the 
+ * environment for Pig to run on top of the mini cluster.
+ *
+ * This class is the base class for MiniCluster, which has slightly
+ * difference among different versions of hadoop. MiniCluster implementation
+ * is located in $PIG_HOME/shims.
+ */
+abstract public class MiniGenericCluster {
+    protected MiniDFSCluster m_dfs = null;
+    protected FileSystem m_fileSys = null;
+    protected Configuration m_conf = null;
+    
+    protected final static MiniCluster INSTANCE = new MiniCluster();
+    protected static boolean isSetup = true;
+    
+    protected MiniGenericCluster() {
+        setupMiniDfsAndMrClusters();
+    }
+    
+    abstract protected void setupMiniDfsAndMrClusters();
+    
+    /**
+     * Returns the single instance of class MiniClusterBuilder that
+     * represents the resouces for a mini dfs cluster and a mini 
+     * mapreduce cluster. 
+     */
+    public static MiniCluster buildCluster() {
+        if(! isSetup){
+            INSTANCE.setupMiniDfsAndMrClusters();
+            isSetup = true;
+        }
+        return INSTANCE;
+    }
+
+    public void shutDown(){
+        INSTANCE.shutdownMiniDfsAndMrClusters();
+    }
+    
+    protected void finalize() {
+        shutdownMiniDfsAndMrClusters();
+    }
+    
+    protected void shutdownMiniDfsAndMrClusters() {
+        isSetup = false;
+        shutdownMiniDfsClusters();
+        shutdownMiniMrClusters();
+    }
+    
+    protected void shutdownMiniDfsClusters() {
+        try {
+            if (m_fileSys != null) { m_fileSys.close(); }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        if (m_dfs != null) { m_dfs.shutdown(); }
+        m_fileSys = null;
+        m_dfs = null;
+    }
+    
+    abstract protected void shutdownMiniMrClusters();
+
+    public Properties getProperties() {
+        errorIfNotSetup();
+        return ConfigurationUtil.toProperties(m_conf);
+    }
+
+    public Configuration getConfiguration() {
+        return new Configuration(m_conf);
+    }
+
+    public void setProperty(String name, String value) {
+        errorIfNotSetup();
+        m_conf.set(name, value);
+    }
+    
+    public FileSystem getFileSystem() {
+        errorIfNotSetup();
+        return m_fileSys;
+    }
+    
+    /**
+     * Throw RunTimeException if isSetup is false
+     */
+    private void errorIfNotSetup(){
+        if(isSetup)
+            return;
+        String msg = "function called on MiniCluster that has been shutdown";
+        throw new RuntimeException(msg);
+    }
+}


[2/5] git commit: ACCUMULO-1783 Use the sequence id to ensure AIF and AOF don't clobber one another.

Posted by el...@apache.org.
ACCUMULO-1783 Use the sequence id to ensure AIF and AOF don't clobber
one another.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/d72e1cb5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/d72e1cb5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/d72e1cb5

Branch: refs/heads/ACCUMULO-1783
Commit: d72e1cb56b1c16bd09bff28d2d72163063781d63
Parents: dd21269
Author: Josh Elser <el...@apache.org>
Authored: Tue Nov 5 17:08:08 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Nov 5 17:08:08 2013 -0500

----------------------------------------------------------------------
 pom.xml                                         | 17 +++++++--
 .../accumulo/pig/AbstractAccumuloStorage.java   | 36 +++++++++++---------
 .../apache/accumulo/pig/AccumuloStorage.java    |  7 ++--
 .../accumulo/pig/AccumuloWholeRowStorage.java   |  5 +--
 .../pig/AbstractAccumuloStorageTest.java        | 23 ++++++++-----
 .../pig/AccumuloWholeRowStorageTest.java        | 14 ++++++--
 6 files changed, 69 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 963dab6..0c82c39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,14 @@
   				<target>1.6</target>
   			</configuration>
   		</plugin>
+  		<plugin>
+			<artifactId>maven-surefire-plugin</artifactId>
+			<version>2.16</version>
+			<configuration>
+				<argLine>-Xmx4g</argLine>
+                <redirectTestOutputToFile>true</redirectTestOutputToFile>
+			</configuration>
+		</plugin>
   	</plugins>
   </build>
   
@@ -50,9 +58,14 @@
       <version>1.2.1</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <version>1.2.1</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
-      <version>1.4.4</version>
+      <version>1.4.5-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>joda-time</groupId>
@@ -73,7 +86,7 @@
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-minicluster</artifactId>
-      <version>1.4.4</version>
+      <version>1.4.5-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 37efe84..da4a51b 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -204,25 +204,29 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    if (!conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false)) {
-      AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
-      AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-      if (columnFamilyColumnQualifierPairs.size() > 0) {
-        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
-        AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
-      }
-      
-      Collection<Range> ranges = Collections.singleton(new Range(start, end));
-      
-      LOG.info("Scanning Accumulo for " + ranges);
-      
-      AccumuloInputFormat.setRanges(conf, ranges);
-      
-      configureInputFormat(conf);
+    int sequence = AccumuloInputFormat.nextSequence();
+    
+    if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false)) {
+      throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
     }
+    
+    AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+    if (columnFamilyColumnQualifierPairs.size() > 0) {
+      LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+      AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
+    }
+    
+    Collection<Range> ranges = Collections.singleton(new Range(start, end));
+    
+    LOG.info("Scanning Accumulo for " + ranges);
+    
+    AccumuloInputFormat.setRanges(conf, sequence, ranges);
+    
+    configureInputFormat(conf, sequence);
   }
   
-  protected void configureInputFormat(Configuration conf) {
+  protected void configureInputFormat(Configuration conf, int sequence) {
     
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index dcfd888..bd43dce 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -141,8 +141,9 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     return map;
   }
   
-  protected void configureInputFormat(Configuration conf) {
-    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
+  @Override
+  protected void configureInputFormat(Configuration conf, int sequence) {
+    AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override
@@ -229,7 +230,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
    */
   protected void addColumn(Mutation mutation, String columnDef, String columnName, Value columnValue) {
     if (null == columnDef && null == columnName) {
-      // TODO Emit a counter here somehow?
+      // TODO Emit a counter here somehow? org.apache.pig.tools.pigstats.PigStatusReporter
       log.warn("Was provided no name or definition for column. Ignoring value");
       return;
     }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index af3ee01..499558f 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -83,8 +83,9 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
     return tuple;
   }
   
-  protected void configureInputFormat(Configuration conf) {
-    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
+  @Override
+  protected void configureInputFormat(Configuration conf, int sequence) {
+    AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 1b5b81a..5f4ecf2 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -33,25 +33,32 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.data.Tuple;
+import org.junit.Before;
 import org.junit.Test;
 
 public class AbstractAccumuloStorageTest {
   
-  public Job getExpectedLoadJob(String inst, String zookeepers, String user, String password, String table, String start, String end,
+  @Before
+  public void setup() {
+    AccumuloInputFormat.resetCounters();
+    AccumuloOutputFormat.resetCounters();
+  }
+  
+  public Job getExpectedLoadJob(int sequence, String inst, String zookeepers, String user, String password, String table, String start, String end,
       Authorizations authorizations, List<Pair<Text,Text>> columnFamilyColumnQualifierPairs) throws IOException {
     Collection<Range> ranges = new LinkedList<Range>();
     ranges.add(new Range(start, end));
     
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
-    AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
-    AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
-    AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
-    AccumuloInputFormat.setRanges(expectedConf, ranges);
+    AccumuloInputFormat.setInputInfo(expectedConf, sequence, user, password.getBytes(), table, authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(expectedConf, sequence, inst, zookeepers);
+    AccumuloInputFormat.fetchColumns(expectedConf, sequence, columnFamilyColumnQualifierPairs);
+    AccumuloInputFormat.setRanges(expectedConf, sequence, ranges);
     return expected;
   }
   
-  public Job getDefaultExpectedLoadJob() throws IOException {
+  public Job getDefaultExpectedLoadJob(int sequence) throws IOException {
     String inst = "myinstance";
     String zookeepers = "127.0.0.1:2181";
     String user = "root";
@@ -66,7 +73,7 @@ public class AbstractAccumuloStorageTest {
     columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col2"), new Text("cq2")));
     columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col3"), null));
     
-    Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
+    Job expected = getExpectedLoadJob(sequence, inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
     return expected;
   }
   
@@ -129,7 +136,7 @@ public class AbstractAccumuloStorageTest {
     s.setLocation(getDefaultLoadLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    Job expected = getDefaultExpectedLoadJob();
+    Job expected = getDefaultExpectedLoadJob(1);
     Configuration expectedConf = expected.getConfiguration();
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index 690d86c..3a0ab85 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -42,9 +43,16 @@ import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DefaultDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.junit.Before;
 import org.junit.Test;
 
 public class AccumuloWholeRowStorageTest {
+
+  @Before
+  public void setup() {
+    AccumuloInputFormat.resetCounters();
+    AccumuloOutputFormat.resetCounters();
+  }
   
   @Test
   public void testConfiguration() throws IOException {
@@ -56,9 +64,11 @@ public class AccumuloWholeRowStorageTest {
     s.setLocation(test.getDefaultLoadLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    Job expected = test.getDefaultExpectedLoadJob();
+    final int sequence = 1;
+    
+    Job expected = test.getDefaultExpectedLoadJob(sequence);
     Configuration expectedConf = expected.getConfiguration();
-    AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(50, WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(expectedConf, sequence, new IteratorSetting(50, WholeRowIterator.class));
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
   }


[4/5] git commit: ACCUMULO-1783 Update the setStoreLocation method and tests.

Posted by el...@apache.org.
ACCUMULO-1783 Update the setStoreLocation method and tests.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/904604db
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/904604db
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/904604db

Branch: refs/heads/ACCUMULO-1783
Commit: 904604db28adad26bc0ba02e7c78db15db6424e8
Parents: 9b398d4
Author: Josh Elser <el...@apache.org>
Authored: Tue Nov 5 20:05:08 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Nov 5 20:05:08 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 39 +++++++++++++-------
 .../pig/AbstractAccumuloStorageTest.java        | 18 ++++-----
 2 files changed, 34 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/904604db/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index da4a51b..9473753 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -94,7 +94,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   int maxWriteThreads = 10;
   long maxMutationBufferSize = 10 * 1000 * 1000;
   int maxLatency = 10 * 1000;
-
+  
   protected LoadStoreCaster caster;
   protected ResourceSchema schema;
   protected String contextSignature = null;
@@ -206,8 +206,11 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     
     int sequence = AccumuloInputFormat.nextSequence();
     
-    if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false)) {
-      throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
+    // TODO Something more.. "certain".
+    if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
+      LOG.warn("InputFormat already configured for " + sequence);
+      return;
+      // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
     }
     
     AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
@@ -248,7 +251,8 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   public void setStoreFuncUDFContextSignature(String signature) {
     this.contextSignature = signature;
     
-  }  
+  }
+  
   /**
    * Returns UDFProperties based on <code>contextSignature</code>.
    */
@@ -264,14 +268,22 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false)) {
-      AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
-      AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-      AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
-      AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
-      AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
-      configureOutputFormat(conf);
+    int sequence = AccumuloOutputFormat.nextSequence();
+    
+    // TODO Something more.. "certain".
+    if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
+      LOG.warn("OutputFormat already configured for " + sequence);
+      return;
+      // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
     }
+    
+    AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true, table);
+    AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+    AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
+    AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
+    AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
+    
+    configureOutputFormat(conf);
   }
   
   @SuppressWarnings("rawtypes")
@@ -298,7 +310,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   }
   
   public void cleanupOnFailure(String failure, Job job) {}
-
+  
   public void cleanupOnSuccess(String location, Job job) {}
   
   @Override
@@ -310,7 +322,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     schema = s;
     getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
   }
-
   
   protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
     Object o = tuple.get(i);
@@ -341,7 +352,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     
   }
   
-  protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { 
+  protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
     Object o = tuple.get(i);
     byte type = schemaToType(o, i, fieldSchemas);
     

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/904604db/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 5f4ecf2..e9f0297 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -77,20 +77,20 @@ public class AbstractAccumuloStorageTest {
     return expected;
   }
   
-  public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
+  public Job getExpectedStoreJob(int sequence, String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
       int maxWriteLatencyMS) throws IOException {
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
-    AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
-    AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
-    AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
-    AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
-    AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
+    AccumuloOutputFormat.setOutputInfo(expectedConf, sequence,user, password.getBytes(), true, table);
+    AccumuloOutputFormat.setZooKeeperInstance(expectedConf, sequence,inst, zookeepers);
+    AccumuloOutputFormat.setMaxLatency(expectedConf, sequence,maxWriteLatencyMS);
+    AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, sequence,maxWriteBufferSize);
+    AccumuloOutputFormat.setMaxWriteThreads(expectedConf, sequence,writeThreads);
     
     return expected;
   }
   
-  public Job getDefaultExpectedStoreJob() throws IOException {
+  public Job getDefaultExpectedStoreJob(int sequence) throws IOException {
     String inst = "myinstance";
     String zookeepers = "127.0.0.1:2181";
     String user = "root";
@@ -100,7 +100,7 @@ public class AbstractAccumuloStorageTest {
     int writeThreads = 7;
     int maxWriteLatencyMS = 30000;
     
-    Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
+    Job expected = getExpectedStoreJob(sequence, inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
     return expected;
   }
   
@@ -150,7 +150,7 @@ public class AbstractAccumuloStorageTest {
     s.setStoreLocation(getDefaultStoreLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    Job expected = getDefaultExpectedStoreJob();
+    Job expected = getDefaultExpectedStoreJob(1);
     Configuration expectedConf = expected.getConfiguration();
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);