You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/09 04:37:26 UTC

[2/2] git commit: ACCUMULO-1783 Clean up now dead or unnecessary code.

ACCUMULO-1783 Clean up now dead or unnecessary code.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/4160c161
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/4160c161
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/4160c161

Branch: refs/heads/ACCUMULO-1783
Commit: 4160c1615010a626beedc318fcaaaef06a258068
Parents: 63d29d4
Author: Josh Elser <el...@apache.org>
Authored: Fri Nov 8 21:27:41 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Nov 8 21:27:41 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 398 ++++---------------
 1 file changed, 84 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/4160c161/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 5faf6c6..a829d4a 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -73,14 +73,14 @@ import org.joda.time.DateTime;
  */
 public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
   private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
-  
-  private static final String COLON = ":", COMMA = ",", PERIOD = ".";
-  private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(), OUTPUT_PREFIX = AccumuloOutputFormat.class.getSimpleName();
-  
+
+  private static final String COLON = ":", COMMA = ",";
+  private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName();
+
   private Configuration conf;
   private RecordReader<Key,Value> reader;
   private RecordWriter<Text,Mutation> writer;
-  
+
   String inst;
   String zookeepers;
   String user;
@@ -90,27 +90,27 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   String auths;
   Authorizations authorizations;
   List<Pair<Text,Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
-  
+
   String start = null;
   String end = null;
-  
+
   int maxWriteThreads = 10;
   long maxMutationBufferSize = 10 * 1000 * 1000;
   int maxLatency = 10 * 1000;
-  
+
   protected LoadStoreCaster caster;
   protected ResourceSchema schema;
   protected String contextSignature = null;
-  
+
   public AbstractAccumuloStorage() {}
-  
+
   @Override
   public Tuple getNext() throws IOException {
     try {
       // load the next pair
       if (!reader.nextKeyValue())
         return null;
-      
+
       Key key = (Key) reader.getCurrentKey();
       Value value = (Value) reader.getCurrentValue();
       assert key != null && value != null;
@@ -119,21 +119,21 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       throw new IOException(e.getMessage());
     }
   }
-  
+
   protected abstract Tuple getTuple(Key key, Value value) throws IOException;
-  
+
   @Override
   @SuppressWarnings("rawtypes")
   public InputFormat getInputFormat() {
     return new AccumuloInputFormat();
   }
-  
+
   @Override
   @SuppressWarnings({"unchecked", "rawtypes"})
   public void prepareToRead(RecordReader reader, PigSplit split) {
     this.reader = reader;
   }
-  
+
   private void setLocationFromUri(String location) throws IOException {
     // ex:
     // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2&start=abc&end=z
@@ -172,13 +172,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       String[] parts = urlParts[0].split("/+");
       table = parts[1];
       tableName = new Text(table);
-      
+
       if (auths == null || auths.equals("")) {
         authorizations = new Authorizations();
       } else {
         authorizations = new Authorizations(auths.split(COMMA));
       }
-      
+
       if (!StringUtils.isEmpty(columns)) {
         for (String cfCq : columns.split(COMMA)) {
           if (cfCq.contains(COLON)) {
@@ -189,7 +189,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
           }
         }
       }
-      
+
     } catch (Exception e) {
       throw new IOException(
           "Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&"
@@ -197,356 +197,126 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
               + e.getMessage());
     }
   }
-  
+
   protected RecordWriter<Text,Mutation> getWriter() {
     return writer;
   }
-  
+
   protected Map<String,String> getInputFormatEntries(Configuration conf) {
     return getEntries(conf, INPUT_PREFIX);
   }
-  
+
   protected Map<String,String> getEntries(Configuration conf, String prefix) {
     Map<String,String> entries = new HashMap<String,String>();
-    
+
     for (Entry<String,String> entry : conf) {
       String key = entry.getKey();
       if (key.startsWith(prefix)) {
         entries.put(key, entry.getValue());
       }
     }
-    
+
     return entries;
   }
-  
-  
+
   @Override
   public void setLocation(String location, Job job) throws IOException {
     conf = job.getConfiguration();
     setLocationFromUri(location);
-    
+
     Map<String,String> entries = getInputFormatEntries(conf);
-    
-    Exception e = new Exception("setLocation");
-    e.printStackTrace(System.out);
-    System.out.println(entries);
-    
+
     for (String key : entries.keySet()) {
       conf.unset(key);
     }
-    
-    entries = getInputFormatEntries(conf);
-    System.out.println(entries);
-    
+
     AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
     AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
     if (columnFamilyColumnQualifierPairs.size() > 0) {
       LOG.info("columns: " + columnFamilyColumnQualifierPairs);
       AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
     }
-    
+
     Collection<Range> ranges = Collections.singleton(new Range(start, end));
-    
+
     LOG.info("Scanning Accumulo for " + ranges + " for table " + table);
-    
+
     AccumuloInputFormat.setRanges(conf, ranges);
-    
+
     configureInputFormat(conf);
-    
-    entries = getInputFormatEntries(conf);
-    System.out.println(entries);
-  }
-  
-  protected void configureInputFormat(Configuration conf) {
-    
   }
-  
-  protected void configureOutputFormat(Configuration conf) {
-    
-  }
-  
+
+  /**
+   * Method to allow specific implementations to add more elements to the Configuration for reading data from Accumulo.
+   * 
+   * @param conf
+   */
+  protected void configureInputFormat(Configuration conf) {}
+
+  /**
+   * Method to allow specific implementations to add more elements to the Configuration for writing data to Accumulo.
+   * 
+   * @param conf
+   */
+  protected void configureOutputFormat(Configuration conf) {}
+
   @Override
   public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
     return location;
   }
-  
+
   @Override
   public void setUDFContextSignature(String signature) {
     this.contextSignature = signature;
   }
-  
+
   /* StoreFunc methods */
   public void setStoreFuncUDFContextSignature(String signature) {
     this.contextSignature = signature;
-    
+
   }
-  
+
   /**
    * Returns UDFProperties based on <code>contextSignature</code>.
    */
   protected Properties getUDFProperties() {
     return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature});
   }
-  
+
   public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
     return relativeToAbsolutePath(location, curDir);
   }
-  
+
   public void setStoreLocation(String location, Job job) throws IOException {
     conf = job.getConfiguration();
     setLocationFromUri(location);
-//    
-//    Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
-//    
-//    Exception e = new Exception("setStoreLocation");
-//    e.printStackTrace(System.out);
-//    System.out.println(entries);
-//    
-//    for (String key : entries.keySet()) {
-//      conf.unset(key);
-//    }
-//    
-//    entries = AccumuloOutputFormat.getRelevantEntries(conf);
-//    System.out.println(entries);
+    
+    // TODO If Pig ever uses a MultipleOutputs-esque construct, this approach will fall apart
     if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
       AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
       AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
       AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
       AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
       AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
-      
+
       LOG.info("Writing data to " + table);
-      
+
       configureOutputFormat(conf);
     }
-    
-//    entries = AccumuloOutputFormat.getRelevantEntries(conf);
-//    System.out.println(entries);
   }
-  
-//  private boolean shouldSetInput(Map<String,String> configEntries) {
-//    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-//    
-//    for (Map<String,String> group : groupedConfigEntries.values()) {
-//      if (null != inst) {
-//        if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
-//        continue;
-//      }
-//      
-//      if (null != zookeepers) {
-//        if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
-//        continue;
-//      }
-//      
-//      if (null != user) {
-//        if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".username")) {
-//        continue;
-//      }
-//      
-//      if (null != password) {
-//        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".password")) {
-//        continue;
-//      }
-//      
-//      if (null != table) {
-//        if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
-//        continue;
-//      }
-//      
-//      if (null != authorizations) {
-//        if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
-//        continue;
-//      }
-//      
-//      String columnValues = group.get(INPUT_PREFIX + ".columns");
-//      if (null != columnFamilyColumnQualifierPairs) {
-//        StringBuilder expected = new StringBuilder(128);
-//        for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
-//          if (0 < expected.length()) {
-//            expected.append(COMMA);
-//          }
-//          
-//          expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
-//          if (column.getSecond() != null)
-//            expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
-//        }
-//        
-//        if (!expected.toString().equals(columnValues)) {
-//          continue;
-//        }
-//      } else if (null != columnValues) {
-//        continue;
-//      }
-//      
-//      Range expected = new Range(start, end);
-//      String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
-//      if (null != serializedRanges) {
-//        try {
-//          // We currently only support serializing one Range into the Configuration from this Storage class
-//          ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
-//          Range range = new Range();
-//          range.readFields(new DataInputStream(bais));
-//          
-//          if (!expected.equals(range)) {
-//            continue;
-//          }
-//        } catch (IOException e) {
-//          // Got an exception, they don't match
-//          continue;
-//        }
-//      }
-//      
-//      // We found a group of entries in the config which are (similar to) what
-//      // we would have set.
-//      return false;
-//    }
-//    
-//    // We didn't find any entries that seemed to match, write the config
-//    return true;
-//  }
-//  
-//  private boolean shouldSetOutput(Map<String,String> configEntries) {
-//    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-//    
-//    for (Map<String,String> group : groupedConfigEntries.values()) {
-//      if (null != inst) {
-//        if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
-//        continue;
-//      }
-//      
-//      if (null != zookeepers) {
-//        if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
-//        continue;
-//      }
-//      
-//      if (null != user) {
-//        if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
-//        continue;
-//      }
-//      
-//      if (null != password) {
-//        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
-//        continue;
-//      }
-//      
-//      if (null != table) {
-//        if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
-//          continue;
-//        }
-//      } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
-//        continue;
-//      }
-//      
-//      String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
-//      try {
-//        if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) {
-//          continue;
-//        }
-//      } catch (NumberFormatException e) {
-//        // Wasn't a number, so it can't match what we were expecting
-//        continue;
-//      }
-//      
-//      String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
-//      try {
-//        if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) {
-//          continue;
-//        }
-//      } catch (NumberFormatException e) {
-//        // Wasn't a number, so it can't match what we were expecting
-//        continue;
-//      }
-//      
-//      String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
-//      try {
-//        if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
-//          continue;
-//        }
-//      } catch (NumberFormatException e) {
-//        // Wasn't a number, so it can't match what we were expecting
-//        continue;
-//      }
-//      
-//      // We found a group of entries in the config which are (similar to) what
-//      // we would have set.
-//      return false;
-//    }
-//    
-//    // We didn't find any entries that seemed to match, write the config
-//    return true;
-//  }
-//  
-//  private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) {
-//    Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
-//    for (Entry<String,String> entry : entries.entrySet()) {
-//      final String key = entry.getKey(), value = entry.getValue();
-//      
-//      if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
-//        continue;
-//      }
-//      
-//      int index = key.lastIndexOf(PERIOD);
-//      if (-1 != index) {
-//        int group = Integer.parseInt(key.substring(index + 1));
-//        String name = key.substring(0, index);
-//        
-//        Map<String,String> entriesInGroup = groupedEntries.get(group);
-//        if (null == entriesInGroup) {
-//          entriesInGroup = new HashMap<String,String>();
-//          groupedEntries.put(group, entriesInGroup);
-//        }
-//        
-//        entriesInGroup.put(name, value);
-//      } else {
-//        LOG.warn("Could not parse key: " + key);
-//      }
-//    }
-//    
-//    return groupedEntries;
-//  }
-  
+
   @SuppressWarnings("rawtypes")
   public OutputFormat getOutputFormat() {
     return new AccumuloOutputFormat();
   }
-  
+
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void prepareToWrite(RecordWriter writer) {
     this.writer = writer;
   }
-  
+
   public abstract Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException;
-  
+
   public void putNext(Tuple tuple) throws ExecException, IOException {
     Collection<Mutation> muts = getMutations(tuple);
     for (Mutation mut : muts) {
@@ -557,11 +327,11 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       }
     }
   }
-  
+
   public void cleanupOnFailure(String failure, Job job) {}
-  
+
   public void cleanupOnSuccess(String location, Job job) {}
-  
+
   @Override
   public void checkSchema(ResourceSchema s) throws IOException {
     if (!(caster instanceof LoadStoreCaster)) {
@@ -571,40 +341,40 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     schema = s;
     getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
   }
-  
+
   protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
     Object o = tuple.get(i);
     byte type = schemaToType(o, i, fieldSchemas);
-    
+
     return objToText(o, type);
   }
-  
+
   protected Text objectToText(Object o, ResourceFieldSchema fieldSchema) throws IOException {
     byte type = schemaToType(o, fieldSchema);
-    
+
     return objToText(o, type);
   }
-  
+
   protected byte schemaToType(Object o, ResourceFieldSchema fieldSchema) {
     return (fieldSchema == null) ? DataType.findType(o) : fieldSchema.getType();
   }
-  
+
   protected byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
     return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
   }
-  
+
   protected byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
     Object o = tuple.get(i);
     byte type = schemaToType(o, i, fieldSchemas);
-    
+
     return objToBytes(o, type);
-    
+
   }
-  
+
   protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
     Object o = tuple.get(i);
     byte type = schemaToType(o, i, fieldSchemas);
-    
+
     switch (type) {
       case DataType.LONG:
         return (Long) o;
@@ -629,13 +399,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       case DataType.BIGINTEGER:
         BigInteger bigintTimestamp = (BigInteger) o;
         long longTimestamp = bigintTimestamp.longValue();
-        
+
         BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
-        
+
         if (!recreatedTimestamp.equals(bigintTimestamp)) {
           LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
         }
-        
+
         return longTimestamp;
       case DataType.BIGDECIMAL:
         BigDecimal bigdecimalTimestamp = (BigDecimal) o;
@@ -658,21 +428,21 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       default:
         LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
         throw new IOException("Could not convert " + o.getClass() + " into long");
-        
+
     }
   }
-  
+
   protected Text objToText(Object o, byte type) throws IOException {
     byte[] bytes = objToBytes(o, type);
-    
+
     if (null == bytes) {
       LOG.warn("Creating empty text from null value");
       return new Text();
     }
-    
+
     return new Text(bytes);
   }
-  
+
   @SuppressWarnings("unchecked")
   protected byte[] objToBytes(Object o, byte type) throws IOException {
     if (o == null)
@@ -700,12 +470,12 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
         return caster.toBytes((Boolean) o);
       case DataType.DATETIME:
         return caster.toBytes((DateTime) o);
-        
+
         // The type conversion here is unchecked.
         // Relying on DataType.findType to do the right thing.
       case DataType.MAP:
         return caster.toBytes((Map<String,Object>) o);
-        
+
       case DataType.NULL:
         return null;
       case DataType.TUPLE: