You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/12/31 03:05:33 UTC

[2/2] git commit: ACCUMULO-1783 Try to work around the lack of Configuration.unset in earlier versions of hadoop.

ACCUMULO-1783 Try to work around the lack of Configuration.unset in earlier versions of hadoop.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/12357a03
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/12357a03
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/12357a03

Branch: refs/heads/ACCUMULO-1783-1.5
Commit: 12357a036226293b46b334fb6bc5fee15e4c2650
Parents: 122f815
Author: Josh Elser <el...@apache.org>
Authored: Mon Dec 30 14:11:40 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Dec 30 14:11:40 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 68 ++++++++++++--
 .../pig/AccumuloStorageConfigurationTest.java   | 94 ++++++++++++++++++++
 2 files changed, 155 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/12357a03/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 801b31f..b3e91fc 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -22,12 +22,13 @@ import java.math.BigInteger;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -116,6 +117,63 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
   protected Map<String,String> getOutputFormatEntries(Configuration conf) {
     return getEntries(conf, OUTPUT_PREFIX);
   }
+  
+  /**
+   * Removes the given values from the configuration, accounting for changes in the Configuration
+   * API given the version of Hadoop being used.
+   * @param conf
+   * @param entriesToUnset
+   */
+  protected void unsetEntriesFromConfiguration(Configuration conf, Map<String,String> entriesToUnset) {
+    boolean configurationHasUnset = true;
+    try {
+      conf.getClass().getMethod("unset", String.class);
+    } catch (NoSuchMethodException e) {
+      configurationHasUnset = false;
+    } catch (SecurityException e) {
+      configurationHasUnset = false;
+    }
+    
+    // Only Hadoop >=1.2.0 and >=0.23 actually contains the method Configuration#unset
+    if (configurationHasUnset) {
+      simpleUnset(conf, entriesToUnset);
+    } else {
+      // If we're running on something else, we have to remove everything and re-add it
+      replaceUnset(conf, entriesToUnset);
+    }
+  }
+  
+  /**
+   * Unsets elements in the Configuration using the unset method
+   * @param conf
+   * @param entriesToUnset
+   */
+  protected void simpleUnset(Configuration conf, Map<String,String> entriesToUnset) {
+    for (String key : entriesToUnset.keySet()) {
+      conf.unset(key);
+    }
+  }
+  
+  /**
+   * Replaces the given entries in the configuration by clearing the Configuration
+   * and re-adding the elements that aren't in the Map of entries to unset
+   * @param conf
+   * @param entriesToUnset
+   */
+  protected void replaceUnset(Configuration conf, Map<String,String> entriesToUnset) {
+    // Gets a copy of the entries
+    Iterator<Entry<String,String>> originalEntries = conf.iterator();
+    conf.clear();
+    
+    while (originalEntries.hasNext()) {
+      Entry<String,String> originalEntry = originalEntries.next();
+      
+      // Only re-set() the pairs that aren't in our collection of keys to unset
+      if (!entriesToUnset.containsKey(originalEntry.getKey())) {
+        conf.set(originalEntry.getKey(), originalEntry.getValue());
+      }
+    }
+  }
 
   @Override
   public Tuple getNext() throws IOException {
@@ -233,9 +291,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     setLocationFromUri(location);
 
     Map<String,String> entries = getInputFormatEntries(job.getConfiguration());
-    for (String key : entries.keySet()) {
-      job.getConfiguration().unset(key);
-    }
+    unsetEntriesFromConfiguration(job.getConfiguration(), entries);
 
     try {
       AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password));
@@ -306,9 +362,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
     setLocationFromUri(location);
     
     Map<String,String> entries = getOutputFormatEntries(job.getConfiguration());
-    for (String key : entries.keySet()) {
-      job.getConfiguration().unset(key);
-    }
+    unsetEntriesFromConfiguration(job.getConfiguration(), entries);
 
     try {
       AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password));

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/12357a03/src/test/java/org/apache/accumulo/pig/AccumuloStorageConfigurationTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageConfigurationTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageConfigurationTest.java
new file mode 100644
index 0000000..079cc0c
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageConfigurationTest.java
@@ -0,0 +1,94 @@
+package org.apache.accumulo.pig;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AccumuloStorageConfigurationTest {
+
+  protected Configuration original;
+  protected AccumuloStorage storage;
+  
+  @Before
+  public void setup() {
+    storage = new AccumuloStorage();
+    
+    original = new Configuration();
+    
+    original.set("string1", "value1");
+    original.set("string2", "value2");
+    original.set("string3", "value3");
+    original.setBoolean("boolean", true);
+    original.setLong("long", 10);
+    original.setInt("integer", 20);
+  }
+  
+  protected Map<String,String> getContents(Configuration conf) {
+    Map<String,String> contents = new HashMap<String,String>();
+    Iterator<Entry<String,String>> iter = conf.iterator();
+    while (iter.hasNext()) {
+      Entry<String,String> entry = iter.next();
+      
+      contents.put(entry.getKey(), entry.getValue());
+    }
+    
+    return contents;
+  }
+  
+  
+  @Test
+  public void testEquivalence() {
+    Configuration unsetCopy = new Configuration(original), clearCopy = new Configuration(original);
+    
+    Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+    
+    Map<String,String>  entriesToUnset = new HashMap<String,String>();
+    entriesToUnset.put("string1", "foo");
+    entriesToUnset.put("string3", "bar");
+    
+    storage.simpleUnset(unsetCopy, entriesToUnset);
+    storage.replaceUnset(clearCopy, entriesToUnset);
+    
+    Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+    
+    Configuration originalCopy = new Configuration(original);
+    originalCopy.unset("string1");
+    originalCopy.unset("string3");
+    
+    Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+    Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+  }
+  
+  
+  @Test
+  public void testEquivalenceOnTypes() {
+    Configuration unsetCopy = new Configuration(original), clearCopy = new Configuration(original);
+    
+    Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+    
+    Map<String,String>  entriesToUnset = new HashMap<String,String>();
+    entriesToUnset.put("long", "foo");
+    entriesToUnset.put("boolean", "bar");
+    entriesToUnset.put("integer", "foobar");
+    
+    storage.simpleUnset(unsetCopy, entriesToUnset);
+    storage.replaceUnset(clearCopy, entriesToUnset);
+    
+    Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+    
+    Configuration originalCopy = new Configuration(original);
+    originalCopy.unset("long");
+    originalCopy.unset("boolean");
+    originalCopy.unset("integer");
+    
+    Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+    Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+  }
+
+}