You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/12/31 03:05:33 UTC
[2/2] git commit: ACCUMULO-1783 Try to work around the lack of
Configuration.unset in earlier versions of hadoop.
ACCUMULO-1783 Try to work around the lack of Configuration.unset in earlier versions of hadoop.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/12357a03
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/12357a03
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/12357a03
Branch: refs/heads/ACCUMULO-1783-1.5
Commit: 12357a036226293b46b334fb6bc5fee15e4c2650
Parents: 122f815
Author: Josh Elser <el...@apache.org>
Authored: Mon Dec 30 14:11:40 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Dec 30 14:11:40 2013 -0500
----------------------------------------------------------------------
.../accumulo/pig/AbstractAccumuloStorage.java | 68 ++++++++++++--
.../pig/AccumuloStorageConfigurationTest.java | 94 ++++++++++++++++++++
2 files changed, 155 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/12357a03/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 801b31f..b3e91fc 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -22,12 +22,13 @@ import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -116,6 +117,63 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
protected Map<String,String> getOutputFormatEntries(Configuration conf) {
return getEntries(conf, OUTPUT_PREFIX);
}
+
+ /**
+ * Removes the given values from the configuration, accounting for changes in the Configuration
+ * API given the version of Hadoop being used.
+ * @param conf
+ * @param entriesToUnset
+ */
+ protected void unsetEntriesFromConfiguration(Configuration conf, Map<String,String> entriesToUnset) {
+ boolean configurationHasUnset = true;
+ try {
+ conf.getClass().getMethod("unset", String.class);
+ } catch (NoSuchMethodException e) {
+ configurationHasUnset = false;
+ } catch (SecurityException e) {
+ configurationHasUnset = false;
+ }
+
+ // Only Hadoop >=1.2.0 and >=0.23 actually contains the method Configuration#unset
+ if (configurationHasUnset) {
+ simpleUnset(conf, entriesToUnset);
+ } else {
+ // If we're running on something else, we have to remove everything and re-add it
+ replaceUnset(conf, entriesToUnset);
+ }
+ }
+
+ /**
+ * Unsets elements in the Configuration using the unset method
+ * @param conf
+ * @param entriesToUnset
+ */
+ protected void simpleUnset(Configuration conf, Map<String,String> entriesToUnset) {
+ for (String key : entriesToUnset.keySet()) {
+ conf.unset(key);
+ }
+ }
+
+ /**
+ * Replaces the given entries in the configuration by clearing the Configuration
+ * and re-adding the elements that aren't in the Map of entries to unset
+ * @param conf
+ * @param entriesToUnset
+ */
+ protected void replaceUnset(Configuration conf, Map<String,String> entriesToUnset) {
+ // Gets a copy of the entries
+ Iterator<Entry<String,String>> originalEntries = conf.iterator();
+ conf.clear();
+
+ while (originalEntries.hasNext()) {
+ Entry<String,String> originalEntry = originalEntries.next();
+
+ // Only re-set() the pairs that aren't in our collection of keys to unset
+ if (!entriesToUnset.containsKey(originalEntry.getKey())) {
+ conf.set(originalEntry.getKey(), originalEntry.getValue());
+ }
+ }
+ }
@Override
public Tuple getNext() throws IOException {
@@ -233,9 +291,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
setLocationFromUri(location);
Map<String,String> entries = getInputFormatEntries(job.getConfiguration());
- for (String key : entries.keySet()) {
- job.getConfiguration().unset(key);
- }
+ unsetEntriesFromConfiguration(job.getConfiguration(), entries);
try {
AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password));
@@ -306,9 +362,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
setLocationFromUri(location);
Map<String,String> entries = getOutputFormatEntries(job.getConfiguration());
- for (String key : entries.keySet()) {
- job.getConfiguration().unset(key);
- }
+ unsetEntriesFromConfiguration(job.getConfiguration(), entries);
try {
AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password));
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/12357a03/src/test/java/org/apache/accumulo/pig/AccumuloStorageConfigurationTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageConfigurationTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageConfigurationTest.java
new file mode 100644
index 0000000..079cc0c
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageConfigurationTest.java
@@ -0,0 +1,94 @@
+package org.apache.accumulo.pig;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AccumuloStorageConfigurationTest {
+
+ protected Configuration original;
+ protected AccumuloStorage storage;
+
+ @Before
+ public void setup() {
+ storage = new AccumuloStorage();
+
+ original = new Configuration();
+
+ original.set("string1", "value1");
+ original.set("string2", "value2");
+ original.set("string3", "value3");
+ original.setBoolean("boolean", true);
+ original.setLong("long", 10);
+ original.setInt("integer", 20);
+ }
+
+ protected Map<String,String> getContents(Configuration conf) {
+ Map<String,String> contents = new HashMap<String,String>();
+ Iterator<Entry<String,String>> iter = conf.iterator();
+ while (iter.hasNext()) {
+ Entry<String,String> entry = iter.next();
+
+ contents.put(entry.getKey(), entry.getValue());
+ }
+
+ return contents;
+ }
+
+
+ @Test
+ public void testEquivalence() {
+ Configuration unsetCopy = new Configuration(original), clearCopy = new Configuration(original);
+
+ Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+
+ Map<String,String> entriesToUnset = new HashMap<String,String>();
+ entriesToUnset.put("string1", "foo");
+ entriesToUnset.put("string3", "bar");
+
+ storage.simpleUnset(unsetCopy, entriesToUnset);
+ storage.replaceUnset(clearCopy, entriesToUnset);
+
+ Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+
+ Configuration originalCopy = new Configuration(original);
+ originalCopy.unset("string1");
+ originalCopy.unset("string3");
+
+ Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+ Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+ }
+
+
+ @Test
+ public void testEquivalenceOnTypes() {
+ Configuration unsetCopy = new Configuration(original), clearCopy = new Configuration(original);
+
+ Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+
+ Map<String,String> entriesToUnset = new HashMap<String,String>();
+ entriesToUnset.put("long", "foo");
+ entriesToUnset.put("boolean", "bar");
+ entriesToUnset.put("integer", "foobar");
+
+ storage.simpleUnset(unsetCopy, entriesToUnset);
+ storage.replaceUnset(clearCopy, entriesToUnset);
+
+ Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+
+ Configuration originalCopy = new Configuration(original);
+ originalCopy.unset("long");
+ originalCopy.unset("boolean");
+ originalCopy.unset("integer");
+
+ Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+ Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+ }
+
+}