You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/09 04:37:25 UTC

[1/2] git commit: ACCUMULO-1783 Rework the setLocation method to ignore the Configuration so we don't shoot ourselves in the foot.

Updated Branches:
  refs/heads/ACCUMULO-1783 22498f775 -> 4160c1615


ACCUMULO-1783 Rework the setLocation method to ignore the Configuration
so we don't shoot ourselves in the foot.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/63d29d4d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/63d29d4d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/63d29d4d

Branch: refs/heads/ACCUMULO-1783
Commit: 63d29d4de56f1cc664b5a433dc2d736e3bc7a066
Parents: 22498f7
Author: Josh Elser <el...@apache.org>
Authored: Fri Nov 8 19:50:19 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Nov 8 19:50:19 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 549 ++++++++++---------
 .../apache/accumulo/pig/AccumuloStorage.java    |   6 +-
 .../accumulo/pig/AccumuloWholeRowStorage.java   |   4 +-
 .../pig/AbstractAccumuloStorageTest.java        |  20 +-
 .../pig/AccumuloWholeRowStorageTest.java        |   5 +-
 5 files changed, 299 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 1d53371..5faf6c6 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -16,8 +16,6 @@
  */
 package org.apache.accumulo.pig;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -32,15 +30,12 @@ import java.util.Properties;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mapreduce.SequencedFormatHelper;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.TextUtil;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -207,45 +202,66 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     return writer;
   }
   
+  protected Map<String,String> getInputFormatEntries(Configuration conf) {
+    return getEntries(conf, INPUT_PREFIX);
+  }
+  
+  protected Map<String,String> getEntries(Configuration conf, String prefix) {
+    Map<String,String> entries = new HashMap<String,String>();
+    
+    for (Entry<String,String> entry : conf) {
+      String key = entry.getKey();
+      if (key.startsWith(prefix)) {
+        entries.put(key, entry.getValue());
+      }
+    }
+    
+    return entries;
+  }
+  
+  
   @Override
   public void setLocation(String location, Job job) throws IOException {
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    Map<String,String> entries = AccumuloInputFormat.getRelevantEntries(conf);
-
-    if (shouldSetInput(entries)) {
-      int sequence = AccumuloInputFormat.nextSequence(conf);
-      
-      // TODO Something more.. "certain".
-      if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
-        LOG.warn("InputFormat already configured for " + sequence);
-        return;
-        // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
-      }
-      
-      AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
-      AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
-      if (columnFamilyColumnQualifierPairs.size() > 0) {
-        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
-        AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
-      }
-      
-      Collection<Range> ranges = Collections.singleton(new Range(start, end));
-      
-      LOG.info("Scanning Accumulo for " + ranges);
-      
-      AccumuloInputFormat.setRanges(conf, sequence, ranges);
-      
-      configureInputFormat(conf, sequence);
+    Map<String,String> entries = getInputFormatEntries(conf);
+    
+    Exception e = new Exception("setLocation");
+    e.printStackTrace(System.out);
+    System.out.println(entries);
+    
+    for (String key : entries.keySet()) {
+      conf.unset(key);
+    }
+    
+    entries = getInputFormatEntries(conf);
+    System.out.println(entries);
+    
+    AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+    if (columnFamilyColumnQualifierPairs.size() > 0) {
+      LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+      AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
     }
+    
+    Collection<Range> ranges = Collections.singleton(new Range(start, end));
+    
+    LOG.info("Scanning Accumulo for " + ranges + " for table " + table);
+    
+    AccumuloInputFormat.setRanges(conf, ranges);
+    
+    configureInputFormat(conf);
+    
+    entries = getInputFormatEntries(conf);
+    System.out.println(entries);
   }
   
-  protected void configureInputFormat(Configuration conf, int sequence) {
+  protected void configureInputFormat(Configuration conf) {
     
   }
   
-  protected void configureOutputFormat(Configuration conf, int sequence) {
+  protected void configureOutputFormat(Configuration conf) {
     
   }
   
@@ -279,242 +295,245 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   public void setStoreLocation(String location, Job job) throws IOException {
     conf = job.getConfiguration();
     setLocationFromUri(location);
-    
-    Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
-    
-    if (shouldSetOutput(entries)) {
-      int sequence = AccumuloOutputFormat.nextSequence(conf);
-      
-      // TODO Something more.. "certain".
-      if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
-        LOG.warn("OutputFormat already configured for " + sequence);
-        return;
-        // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
-      }
-      
-      AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true, table);
-      AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
-      AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
-      AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
-      AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
-      
-      configureOutputFormat(conf, sequence);
-    } else {
-      LOG.debug("Not setting configuration as it appears that it has already been set");
+//    
+//    Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
+//    
+//    Exception e = new Exception("setStoreLocation");
+//    e.printStackTrace(System.out);
+//    System.out.println(entries);
+//    
+//    for (String key : entries.keySet()) {
+//      conf.unset(key);
+//    }
+//    
+//    entries = AccumuloOutputFormat.getRelevantEntries(conf);
+//    System.out.println(entries);
+    if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
+      AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
+      AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+      AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
+      AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
+      AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
+      
+      LOG.info("Writing data to " + table);
+      
+      configureOutputFormat(conf);
     }
-  }
-  
-  private boolean shouldSetInput(Map<String,String> configEntries) {
-    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
     
-    for (Map<String,String> group : groupedConfigEntries.values()) {
-      if (null != inst) {
-        if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
-        continue;
-      }
-      
-      if (null != zookeepers) {
-        if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
-        continue;
-      }
-      
-      if (null != user) {
-        if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".username")) {
-        continue;
-      }
-      
-      if (null != password) {
-        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".password")) {
-        continue;
-      }
-      
-      if (null != table) {
-        if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
-        continue;
-      }
-      
-      if (null != authorizations) {
-        if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
-        continue;
-      }
-      
-      String columnValues = group.get(INPUT_PREFIX + ".columns");
-      if (null != columnFamilyColumnQualifierPairs) {
-        StringBuilder expected = new StringBuilder(128);
-        for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
-          if (0 < expected.length()) {
-            expected.append(COMMA);
-          }
-          
-          expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
-          if (column.getSecond() != null)
-            expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
-        }
-        
-        if (!expected.toString().equals(columnValues)) {
-          continue;
-        }
-      } else if (null != columnValues){ 
-        continue;
-      }
-      
-      Range expected = new Range(start, end);
-      String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
-      if (null != serializedRanges) {
-        try {
-          // We currently only support serializing one Range into the Configuration from this Storage class
-          ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
-          Range range = new Range();
-          range.readFields(new DataInputStream(bais));
-          
-          if (!expected.equals(range)) {
-            continue;
-          }
-        } catch (IOException e) {
-          // Got an exception, they don't match
-          continue;
-        }
-      }
-      
-      
-      // We found a group of entries in the config which are (similar to) what
-      // we would have set.
-      return false;
-    }
-    
-    // We didn't find any entries that seemed to match, write the config
-    return true;
+//    entries = AccumuloOutputFormat.getRelevantEntries(conf);
+//    System.out.println(entries);
   }
   
-  private boolean shouldSetOutput(Map<String,String> configEntries) {
-    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-    
-    for (Map<String,String> group : groupedConfigEntries.values()) {
-      if (null != inst) {
-        if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
-          continue;
-        }
-      } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
-        continue;
-      }
-      
-      if (null != zookeepers) {
-        if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
-          continue;
-        }
-      } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
-        continue;
-      }
-      
-      if (null != user) {
-        if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
-          continue;
-        }
-      } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
-        continue;
-      }
-      
-      if (null != password) {
-        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) {
-          continue;
-        }
-      } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
-        continue;
-      }
-      
-      if (null != table) {
-        if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
-          continue;
-        }
-      } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
-        continue;
-      }
-      
-      String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
-      try {
-        if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) {
-            continue;
-          }
-      } catch (NumberFormatException e) {
-        // Wasn't a number, so it can't match what we were expecting
-        continue;
-      }
-      
-      String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
-      try {
-        if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) {
-            continue;
-          }
-      } catch (NumberFormatException e) {
-        // Wasn't a number, so it can't match what we were expecting
-        continue;
-      }
-      
-      String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
-      try {
-        if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
-            continue;
-          }
-      } catch (NumberFormatException e) {
-        // Wasn't a number, so it can't match what we were expecting
-        continue;
-      }
-      
-      // We found a group of entries in the config which are (similar to) what
-      // we would have set.
-      return false;
-    }
-    
-    // We didn't find any entries that seemed to match, write the config
-    return true;
-  }
-  
-  private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) {
-    Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
-    for (Entry<String,String> entry : entries.entrySet()) {
-      final String key = entry.getKey(), value = entry.getValue();
-      
-      if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
-        continue;
-      }
-      
-      int index = key.lastIndexOf(PERIOD);
-      if (-1 != index) {
-        int group = Integer.parseInt(key.substring(index + 1));
-        String name = key.substring(0, index);
-        
-        Map<String,String> entriesInGroup = groupedEntries.get(group);
-        if (null == entriesInGroup) {
-          entriesInGroup = new HashMap<String,String>();
-          groupedEntries.put(group, entriesInGroup);
-        }
-        
-        entriesInGroup.put(name, value);
-      } else {
-        LOG.warn("Could not parse key: " + key);
-      }
-    }
-    
-    return groupedEntries;
-  }
+//  private boolean shouldSetInput(Map<String,String> configEntries) {
+//    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
+//    
+//    for (Map<String,String> group : groupedConfigEntries.values()) {
+//      if (null != inst) {
+//        if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
+//        continue;
+//      }
+//      
+//      if (null != zookeepers) {
+//        if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
+//        continue;
+//      }
+//      
+//      if (null != user) {
+//        if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".username")) {
+//        continue;
+//      }
+//      
+//      if (null != password) {
+//        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".password")) {
+//        continue;
+//      }
+//      
+//      if (null != table) {
+//        if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
+//        continue;
+//      }
+//      
+//      if (null != authorizations) {
+//        if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
+//        continue;
+//      }
+//      
+//      String columnValues = group.get(INPUT_PREFIX + ".columns");
+//      if (null != columnFamilyColumnQualifierPairs) {
+//        StringBuilder expected = new StringBuilder(128);
+//        for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+//          if (0 < expected.length()) {
+//            expected.append(COMMA);
+//          }
+//          
+//          expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
+//          if (column.getSecond() != null)
+//            expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
+//        }
+//        
+//        if (!expected.toString().equals(columnValues)) {
+//          continue;
+//        }
+//      } else if (null != columnValues) {
+//        continue;
+//      }
+//      
+//      Range expected = new Range(start, end);
+//      String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
+//      if (null != serializedRanges) {
+//        try {
+//          // We currently only support serializing one Range into the Configuration from this Storage class
+//          ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
+//          Range range = new Range();
+//          range.readFields(new DataInputStream(bais));
+//          
+//          if (!expected.equals(range)) {
+//            continue;
+//          }
+//        } catch (IOException e) {
+//          // Got an exception, they don't match
+//          continue;
+//        }
+//      }
+//      
+//      // We found a group of entries in the config which are (similar to) what
+//      // we would have set.
+//      return false;
+//    }
+//    
+//    // We didn't find any entries that seemed to match, write the config
+//    return true;
+//  }
+//  
+//  private boolean shouldSetOutput(Map<String,String> configEntries) {
+//    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
+//    
+//    for (Map<String,String> group : groupedConfigEntries.values()) {
+//      if (null != inst) {
+//        if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
+//        continue;
+//      }
+//      
+//      if (null != zookeepers) {
+//        if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
+//        continue;
+//      }
+//      
+//      if (null != user) {
+//        if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
+//        continue;
+//      }
+//      
+//      if (null != password) {
+//        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
+//        continue;
+//      }
+//      
+//      if (null != table) {
+//        if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
+//        continue;
+//      }
+//      
+//      String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
+//      try {
+//        if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) {
+//          continue;
+//        }
+//      } catch (NumberFormatException e) {
+//        // Wasn't a number, so it can't match what we were expecting
+//        continue;
+//      }
+//      
+//      String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
+//      try {
+//        if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) {
+//          continue;
+//        }
+//      } catch (NumberFormatException e) {
+//        // Wasn't a number, so it can't match what we were expecting
+//        continue;
+//      }
+//      
+//      String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
+//      try {
+//        if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
+//          continue;
+//        }
+//      } catch (NumberFormatException e) {
+//        // Wasn't a number, so it can't match what we were expecting
+//        continue;
+//      }
+//      
+//      // We found a group of entries in the config which are (similar to) what
+//      // we would have set.
+//      return false;
+//    }
+//    
+//    // We didn't find any entries that seemed to match, write the config
+//    return true;
+//  }
+//  
+//  private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) {
+//    Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
+//    for (Entry<String,String> entry : entries.entrySet()) {
+//      final String key = entry.getKey(), value = entry.getValue();
+//      
+//      if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
+//        continue;
+//      }
+//      
+//      int index = key.lastIndexOf(PERIOD);
+//      if (-1 != index) {
+//        int group = Integer.parseInt(key.substring(index + 1));
+//        String name = key.substring(0, index);
+//        
+//        Map<String,String> entriesInGroup = groupedEntries.get(group);
+//        if (null == entriesInGroup) {
+//          entriesInGroup = new HashMap<String,String>();
+//          groupedEntries.put(group, entriesInGroup);
+//        }
+//        
+//        entriesInGroup.put(name, value);
+//      } else {
+//        LOG.warn("Could not parse key: " + key);
+//      }
+//    }
+//    
+//    return groupedEntries;
+//  }
   
   @SuppressWarnings("rawtypes")
   public OutputFormat getOutputFormat() {

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index bd43dce..8e9cfef 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -73,7 +73,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   
   @Override
   protected Tuple getTuple(Key key, Value value) throws IOException {
-    
+//    System.out.println(key);
     SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
     
     List<Object> tupleEntries = Lists.newLinkedList();
@@ -142,8 +142,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   }
   
   @Override
-  protected void configureInputFormat(Configuration conf, int sequence) {
-    AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
+  protected void configureInputFormat(Configuration conf) {
+    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index 499558f..a6db638 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -84,8 +84,8 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
   }
   
   @Override
-  protected void configureInputFormat(Configuration conf, int sequence) {
-    AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
+  protected void configureInputFormat(Configuration conf) {
+    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index bc886ec..1b5b81a 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -44,11 +44,10 @@ public class AbstractAccumuloStorageTest {
     
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
-    final int sequence = AccumuloInputFormat.nextSequence(expectedConf);
-    AccumuloInputFormat.setInputInfo(expectedConf, sequence, user, password.getBytes(), table, authorizations);
-    AccumuloInputFormat.setZooKeeperInstance(expectedConf, sequence, inst, zookeepers);
-    AccumuloInputFormat.fetchColumns(expectedConf, sequence, columnFamilyColumnQualifierPairs);
-    AccumuloInputFormat.setRanges(expectedConf, sequence, ranges);
+    AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+    AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
+    AccumuloInputFormat.setRanges(expectedConf, ranges);
     return expected;
   }
   
@@ -75,12 +74,11 @@ public class AbstractAccumuloStorageTest {
       int maxWriteLatencyMS) throws IOException {
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
-    final int sequence = AccumuloOutputFormat.nextSequence(expectedConf);
-    AccumuloOutputFormat.setOutputInfo(expectedConf, sequence,user, password.getBytes(), true, table);
-    AccumuloOutputFormat.setZooKeeperInstance(expectedConf, sequence,inst, zookeepers);
-    AccumuloOutputFormat.setMaxLatency(expectedConf, sequence,maxWriteLatencyMS);
-    AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, sequence,maxWriteBufferSize);
-    AccumuloOutputFormat.setMaxWriteThreads(expectedConf, sequence,writeThreads);
+    AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
+    AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+    AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
+    AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
+    AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
     
     return expected;
   }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index b57c7ba..690d86c 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -56,12 +56,9 @@ public class AccumuloWholeRowStorageTest {
     s.setLocation(test.getDefaultLoadLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    // A little brittle. We know this is the sequence number used when we create the DefaultExpectedLoadJob()
-    final int sequence = 1;
-    
     Job expected = test.getDefaultExpectedLoadJob();
     Configuration expectedConf = expected.getConfiguration();
-    AccumuloInputFormat.addIterator(expectedConf, sequence, new IteratorSetting(50, WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(50, WholeRowIterator.class));
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
   }


[2/2] git commit: ACCUMULO-1783 Clean up now dead or unnecessary code.

Posted by el...@apache.org.
ACCUMULO-1783 Clean up now dead or unnecessary code.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/4160c161
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/4160c161
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/4160c161

Branch: refs/heads/ACCUMULO-1783
Commit: 4160c1615010a626beedc318fcaaaef06a258068
Parents: 63d29d4
Author: Josh Elser <el...@apache.org>
Authored: Fri Nov 8 21:27:41 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Nov 8 21:27:41 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 398 ++++---------------
 1 file changed, 84 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/4160c161/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 5faf6c6..a829d4a 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -73,14 +73,14 @@ import org.joda.time.DateTime;
  */
 public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
   private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
-  
-  private static final String COLON = ":", COMMA = ",", PERIOD = ".";
-  private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(), OUTPUT_PREFIX = AccumuloOutputFormat.class.getSimpleName();
-  
+
+  private static final String COLON = ":", COMMA = ",";
+  private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName();
+
   private Configuration conf;
   private RecordReader<Key,Value> reader;
   private RecordWriter<Text,Mutation> writer;
-  
+
   String inst;
   String zookeepers;
   String user;
@@ -90,27 +90,27 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   String auths;
   Authorizations authorizations;
   List<Pair<Text,Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
-  
+
   String start = null;
   String end = null;
-  
+
   int maxWriteThreads = 10;
   long maxMutationBufferSize = 10 * 1000 * 1000;
   int maxLatency = 10 * 1000;
-  
+
   protected LoadStoreCaster caster;
   protected ResourceSchema schema;
   protected String contextSignature = null;
-  
+
   public AbstractAccumuloStorage() {}
-  
+
   @Override
   public Tuple getNext() throws IOException {
     try {
       // load the next pair
       if (!reader.nextKeyValue())
         return null;
-      
+
       Key key = (Key) reader.getCurrentKey();
       Value value = (Value) reader.getCurrentValue();
       assert key != null && value != null;
@@ -119,21 +119,21 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       throw new IOException(e.getMessage());
     }
   }
-  
+
   protected abstract Tuple getTuple(Key key, Value value) throws IOException;
-  
+
   @Override
   @SuppressWarnings("rawtypes")
   public InputFormat getInputFormat() {
     return new AccumuloInputFormat();
   }
-  
+
   @Override
   @SuppressWarnings({"unchecked", "rawtypes"})
   public void prepareToRead(RecordReader reader, PigSplit split) {
     this.reader = reader;
   }
-  
+
   private void setLocationFromUri(String location) throws IOException {
     // ex:
     // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2&start=abc&end=z
@@ -172,13 +172,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       String[] parts = urlParts[0].split("/+");
       table = parts[1];
       tableName = new Text(table);
-      
+
       if (auths == null || auths.equals("")) {
         authorizations = new Authorizations();
       } else {
         authorizations = new Authorizations(auths.split(COMMA));
       }
-      
+
       if (!StringUtils.isEmpty(columns)) {
         for (String cfCq : columns.split(COMMA)) {
           if (cfCq.contains(COLON)) {
@@ -189,7 +189,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
           }
         }
       }
-      
+
     } catch (Exception e) {
       throw new IOException(
           "Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&"
@@ -197,356 +197,126 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
               + e.getMessage());
     }
   }
-  
+
   protected RecordWriter<Text,Mutation> getWriter() {
     return writer;
   }
-  
+
   protected Map<String,String> getInputFormatEntries(Configuration conf) {
     return getEntries(conf, INPUT_PREFIX);
   }
-  
+
   protected Map<String,String> getEntries(Configuration conf, String prefix) {
     Map<String,String> entries = new HashMap<String,String>();
-    
+
     for (Entry<String,String> entry : conf) {
       String key = entry.getKey();
       if (key.startsWith(prefix)) {
         entries.put(key, entry.getValue());
       }
     }
-    
+
     return entries;
   }
-  
-  
+
   @Override
   public void setLocation(String location, Job job) throws IOException {
     conf = job.getConfiguration();
     setLocationFromUri(location);
-    
+
     Map<String,String> entries = getInputFormatEntries(conf);
-    
-    Exception e = new Exception("setLocation");
-    e.printStackTrace(System.out);
-    System.out.println(entries);
-    
+
     for (String key : entries.keySet()) {
       conf.unset(key);
     }
-    
-    entries = getInputFormatEntries(conf);
-    System.out.println(entries);
-    
+
     AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
     AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
     if (columnFamilyColumnQualifierPairs.size() > 0) {
       LOG.info("columns: " + columnFamilyColumnQualifierPairs);
       AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
     }
-    
+
     Collection<Range> ranges = Collections.singleton(new Range(start, end));
-    
+
     LOG.info("Scanning Accumulo for " + ranges + " for table " + table);
-    
+
     AccumuloInputFormat.setRanges(conf, ranges);
-    
+
     configureInputFormat(conf);
-    
-    entries = getInputFormatEntries(conf);
-    System.out.println(entries);
-  }
-  
-  protected void configureInputFormat(Configuration conf) {
-    
   }
-  
-  protected void configureOutputFormat(Configuration conf) {
-    
-  }
-  
+
+  /**
+   * Method to allow specific implementations to add more elements to the Configuration for reading data from Accumulo.
+   * 
+   * @param conf
+   */
+  protected void configureInputFormat(Configuration conf) {}
+
+  /**
+   * Method to allow specific implementations to add more elements to the Configuration for writing data to Accumulo.
+   * 
+   * @param conf
+   */
+  protected void configureOutputFormat(Configuration conf) {}
+
   @Override
   public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
     return location;
   }
-  
+
   @Override
   public void setUDFContextSignature(String signature) {
     this.contextSignature = signature;
   }
-  
+
   /* StoreFunc methods */
   public void setStoreFuncUDFContextSignature(String signature) {
     this.contextSignature = signature;
-    
+
   }
-  
+
   /**
    * Returns UDFProperties based on <code>contextSignature</code>.
    */
   protected Properties getUDFProperties() {
     return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature});
   }
-  
+
   public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
     return relativeToAbsolutePath(location, curDir);
   }
-  
+
   public void setStoreLocation(String location, Job job) throws IOException {
     conf = job.getConfiguration();
     setLocationFromUri(location);
-//    
-//    Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
-//    
-//    Exception e = new Exception("setStoreLocation");
-//    e.printStackTrace(System.out);
-//    System.out.println(entries);
-//    
-//    for (String key : entries.keySet()) {
-//      conf.unset(key);
-//    }
-//    
-//    entries = AccumuloOutputFormat.getRelevantEntries(conf);
-//    System.out.println(entries);
+    
+    // TODO If Pig ever uses a MultipleOutputs-esque construct, this approach will fall apart
     if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
       AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
       AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
       AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
       AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
       AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
-      
+
       LOG.info("Writing data to " + table);
-      
+
       configureOutputFormat(conf);
     }
-    
-//    entries = AccumuloOutputFormat.getRelevantEntries(conf);
-//    System.out.println(entries);
   }
-  
-//  private boolean shouldSetInput(Map<String,String> configEntries) {
-//    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-//    
-//    for (Map<String,String> group : groupedConfigEntries.values()) {
-//      if (null != inst) {
-//        if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
-//        continue;
-//      }
-//      
-//      if (null != zookeepers) {
-//        if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
-//        continue;
-//      }
-//      
-//      if (null != user) {
-//        if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".username")) {
-//        continue;
-//      }
-//      
-//      if (null != password) {
-//        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".password")) {
-//        continue;
-//      }
-//      
-//      if (null != table) {
-//        if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
-//        continue;
-//      }
-//      
-//      if (null != authorizations) {
-//        if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
-//        continue;
-//      }
-//      
-//      String columnValues = group.get(INPUT_PREFIX + ".columns");
-//      if (null != columnFamilyColumnQualifierPairs) {
-//        StringBuilder expected = new StringBuilder(128);
-//        for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
-//          if (0 < expected.length()) {
-//            expected.append(COMMA);
-//          }
-//          
-//          expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
-//          if (column.getSecond() != null)
-//            expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
-//        }
-//        
-//        if (!expected.toString().equals(columnValues)) {
-//          continue;
-//        }
-//      } else if (null != columnValues) {
-//        continue;
-//      }
-//      
-//      Range expected = new Range(start, end);
-//      String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
-//      if (null != serializedRanges) {
-//        try {
-//          // We currently only support serializing one Range into the Configuration from this Storage class
-//          ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
-//          Range range = new Range();
-//          range.readFields(new DataInputStream(bais));
-//          
-//          if (!expected.equals(range)) {
-//            continue;
-//          }
-//        } catch (IOException e) {
-//          // Got an exception, they don't match
-//          continue;
-//        }
-//      }
-//      
-//      // We found a group of entries in the config which are (similar to) what
-//      // we would have set.
-//      return false;
-//    }
-//    
-//    // We didn't find any entries that seemed to match, write the config
-//    return true;
-//  }
-//  
-//  private boolean shouldSetOutput(Map<String,String> configEntries) {
-//    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-//    
-//    for (Map<String,String> group : groupedConfigEntries.values()) {
-//      if (null != inst) {
-//        if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
-//        continue;
-//      }
-//      
-//      if (null != zookeepers) {
-//        if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
-//        continue;
-//      }
-//      
-//      if (null != user) {
-//        if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
-//        continue;
-//      }
-//      
-//      if (null != password) {
-//        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
-//        continue;
-//      }
-//      
-//      if (null != table) {
-//        if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
-//        continue;
-//      }
-//      
-//      String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
-//      try {
-//        if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) {
-//          continue;
-//        }
-//      } catch (NumberFormatException e) {
-//        // Wasn't a number, so it can't match what we were expecting
-//        continue;
-//      }
-//      
-//      String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
-//      try {
-//        if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) {
-//          continue;
-//        }
-//      } catch (NumberFormatException e) {
-//        // Wasn't a number, so it can't match what we were expecting
-//        continue;
-//      }
-//      
-//      String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
-//      try {
-//        if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
-//          continue;
-//        }
-//      } catch (NumberFormatException e) {
-//        // Wasn't a number, so it can't match what we were expecting
-//        continue;
-//      }
-//      
-//      // We found a group of entries in the config which are (similar to) what
-//      // we would have set.
-//      return false;
-//    }
-//    
-//    // We didn't find any entries that seemed to match, write the config
-//    return true;
-//  }
-//  
-//  private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) {
-//    Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
-//    for (Entry<String,String> entry : entries.entrySet()) {
-//      final String key = entry.getKey(), value = entry.getValue();
-//      
-//      if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
-//        continue;
-//      }
-//      
-//      int index = key.lastIndexOf(PERIOD);
-//      if (-1 != index) {
-//        int group = Integer.parseInt(key.substring(index + 1));
-//        String name = key.substring(0, index);
-//        
-//        Map<String,String> entriesInGroup = groupedEntries.get(group);
-//        if (null == entriesInGroup) {
-//          entriesInGroup = new HashMap<String,String>();
-//          groupedEntries.put(group, entriesInGroup);
-//        }
-//        
-//        entriesInGroup.put(name, value);
-//      } else {
-//        LOG.warn("Could not parse key: " + key);
-//      }
-//    }
-//    
-//    return groupedEntries;
-//  }
-  
+
   @SuppressWarnings("rawtypes")
   public OutputFormat getOutputFormat() {
     return new AccumuloOutputFormat();
   }
-  
+
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void prepareToWrite(RecordWriter writer) {
     this.writer = writer;
   }
-  
+
   public abstract Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException;
-  
+
   public void putNext(Tuple tuple) throws ExecException, IOException {
     Collection<Mutation> muts = getMutations(tuple);
     for (Mutation mut : muts) {
@@ -557,11 +327,11 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       }
     }
   }
-  
+
   public void cleanupOnFailure(String failure, Job job) {}
-  
+
   public void cleanupOnSuccess(String location, Job job) {}
-  
+
   @Override
   public void checkSchema(ResourceSchema s) throws IOException {
     if (!(caster instanceof LoadStoreCaster)) {
@@ -571,40 +341,40 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     schema = s;
     getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
   }
-  
+
   protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
     Object o = tuple.get(i);
     byte type = schemaToType(o, i, fieldSchemas);
-    
+
     return objToText(o, type);
   }
-  
+
   protected Text objectToText(Object o, ResourceFieldSchema fieldSchema) throws IOException {
     byte type = schemaToType(o, fieldSchema);
-    
+
     return objToText(o, type);
   }
-  
+
   protected byte schemaToType(Object o, ResourceFieldSchema fieldSchema) {
     return (fieldSchema == null) ? DataType.findType(o) : fieldSchema.getType();
   }
-  
+
   protected byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
     return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
   }
-  
+
   protected byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
     Object o = tuple.get(i);
     byte type = schemaToType(o, i, fieldSchemas);
-    
+
     return objToBytes(o, type);
-    
+
   }
-  
+
   protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
     Object o = tuple.get(i);
     byte type = schemaToType(o, i, fieldSchemas);
-    
+
     switch (type) {
       case DataType.LONG:
         return (Long) o;
@@ -629,13 +399,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       case DataType.BIGINTEGER:
         BigInteger bigintTimestamp = (BigInteger) o;
         long longTimestamp = bigintTimestamp.longValue();
-        
+
         BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
-        
+
         if (!recreatedTimestamp.equals(bigintTimestamp)) {
           LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
         }
-        
+
         return longTimestamp;
       case DataType.BIGDECIMAL:
         BigDecimal bigdecimalTimestamp = (BigDecimal) o;
@@ -658,21 +428,21 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       default:
         LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
         throw new IOException("Could not convert " + o.getClass() + " into long");
-        
+
     }
   }
-  
+
   protected Text objToText(Object o, byte type) throws IOException {
     byte[] bytes = objToBytes(o, type);
-    
+
     if (null == bytes) {
       LOG.warn("Creating empty text from null value");
       return new Text();
     }
-    
+
     return new Text(bytes);
   }
-  
+
   @SuppressWarnings("unchecked")
   protected byte[] objToBytes(Object o, byte type) throws IOException {
     if (o == null)
@@ -700,12 +470,12 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
         return caster.toBytes((Boolean) o);
       case DataType.DATETIME:
         return caster.toBytes((DateTime) o);
-        
+
         // The type conversion here is unchecked.
         // Relying on DataType.findType to do the right thing.
       case DataType.MAP:
         return caster.toBytes((Map<String,Object>) o);
-        
+
       case DataType.NULL:
         return null;
       case DataType.TUPLE: