You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/09 04:37:25 UTC
[1/2] git commit: ACCUMULO-1783 Rework the setLocation method to
ignore the Configuration so we don't shoot ourselves in the foot.
Updated Branches:
refs/heads/ACCUMULO-1783 22498f775 -> 4160c1615
ACCUMULO-1783 Rework the setLocation method to ignore the Configuration
so we don't shoot ourselves in the foot.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/63d29d4d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/63d29d4d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/63d29d4d
Branch: refs/heads/ACCUMULO-1783
Commit: 63d29d4de56f1cc664b5a433dc2d736e3bc7a066
Parents: 22498f7
Author: Josh Elser <el...@apache.org>
Authored: Fri Nov 8 19:50:19 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Nov 8 19:50:19 2013 -0500
----------------------------------------------------------------------
.../accumulo/pig/AbstractAccumuloStorage.java | 549 ++++++++++---------
.../apache/accumulo/pig/AccumuloStorage.java | 6 +-
.../accumulo/pig/AccumuloWholeRowStorage.java | 4 +-
.../pig/AbstractAccumuloStorageTest.java | 20 +-
.../pig/AccumuloWholeRowStorageTest.java | 5 +-
5 files changed, 299 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 1d53371..5faf6c6 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -16,8 +16,6 @@
*/
package org.apache.accumulo.pig;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -32,15 +30,12 @@ import java.util.Properties;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mapreduce.SequencedFormatHelper;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.TextUtil;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -207,45 +202,66 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
return writer;
}
+ protected Map<String,String> getInputFormatEntries(Configuration conf) {
+ return getEntries(conf, INPUT_PREFIX);
+ }
+
+ protected Map<String,String> getEntries(Configuration conf, String prefix) {
+ Map<String,String> entries = new HashMap<String,String>();
+
+ for (Entry<String,String> entry : conf) {
+ String key = entry.getKey();
+ if (key.startsWith(prefix)) {
+ entries.put(key, entry.getValue());
+ }
+ }
+
+ return entries;
+ }
+
+
@Override
public void setLocation(String location, Job job) throws IOException {
conf = job.getConfiguration();
setLocationFromUri(location);
- Map<String,String> entries = AccumuloInputFormat.getRelevantEntries(conf);
-
- if (shouldSetInput(entries)) {
- int sequence = AccumuloInputFormat.nextSequence(conf);
-
- // TODO Something more.. "certain".
- if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
- LOG.warn("InputFormat already configured for " + sequence);
- return;
- // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
- }
-
- AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
- if (columnFamilyColumnQualifierPairs.size() > 0) {
- LOG.info("columns: " + columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
- }
-
- Collection<Range> ranges = Collections.singleton(new Range(start, end));
-
- LOG.info("Scanning Accumulo for " + ranges);
-
- AccumuloInputFormat.setRanges(conf, sequence, ranges);
-
- configureInputFormat(conf, sequence);
+ Map<String,String> entries = getInputFormatEntries(conf);
+
+ Exception e = new Exception("setLocation");
+ e.printStackTrace(System.out);
+ System.out.println(entries);
+
+ for (String key : entries.keySet()) {
+ conf.unset(key);
+ }
+
+ entries = getInputFormatEntries(conf);
+ System.out.println(entries);
+
+ AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+ if (columnFamilyColumnQualifierPairs.size() > 0) {
+ LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
}
+
+ Collection<Range> ranges = Collections.singleton(new Range(start, end));
+
+ LOG.info("Scanning Accumulo for " + ranges + " for table " + table);
+
+ AccumuloInputFormat.setRanges(conf, ranges);
+
+ configureInputFormat(conf);
+
+ entries = getInputFormatEntries(conf);
+ System.out.println(entries);
}
- protected void configureInputFormat(Configuration conf, int sequence) {
+ protected void configureInputFormat(Configuration conf) {
}
- protected void configureOutputFormat(Configuration conf, int sequence) {
+ protected void configureOutputFormat(Configuration conf) {
}
@@ -279,242 +295,245 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
public void setStoreLocation(String location, Job job) throws IOException {
conf = job.getConfiguration();
setLocationFromUri(location);
-
- Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
-
- if (shouldSetOutput(entries)) {
- int sequence = AccumuloOutputFormat.nextSequence(conf);
-
- // TODO Something more.. "certain".
- if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence, false)) {
- LOG.warn("OutputFormat already configured for " + sequence);
- return;
- // throw new RuntimeException("Was provided sequence number which was already configured: " + sequence);
- }
-
- AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true, table);
- AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
- AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
- AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
- AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
-
- configureOutputFormat(conf, sequence);
- } else {
- LOG.debug("Not setting configuration as it appears that it has already been set");
+//
+// Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
+//
+// Exception e = new Exception("setStoreLocation");
+// e.printStackTrace(System.out);
+// System.out.println(entries);
+//
+// for (String key : entries.keySet()) {
+// conf.unset(key);
+// }
+//
+// entries = AccumuloOutputFormat.getRelevantEntries(conf);
+// System.out.println(entries);
+ if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
+ AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
+ AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+ AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
+ AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
+ AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
+
+ LOG.info("Writing data to " + table);
+
+ configureOutputFormat(conf);
}
- }
-
- private boolean shouldSetInput(Map<String,String> configEntries) {
- Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
- for (Map<String,String> group : groupedConfigEntries.values()) {
- if (null != inst) {
- if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
- continue;
- }
- } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
- continue;
- }
-
- if (null != zookeepers) {
- if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
- continue;
- }
- } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
- continue;
- }
-
- if (null != user) {
- if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
- continue;
- }
- } else if (null != group.get(INPUT_PREFIX + ".username")) {
- continue;
- }
-
- if (null != password) {
- if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) {
- continue;
- }
- } else if (null != group.get(INPUT_PREFIX + ".password")) {
- continue;
- }
-
- if (null != table) {
- if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
- continue;
- }
- } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
- continue;
- }
-
- if (null != authorizations) {
- if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) {
- continue;
- }
- } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
- continue;
- }
-
- String columnValues = group.get(INPUT_PREFIX + ".columns");
- if (null != columnFamilyColumnQualifierPairs) {
- StringBuilder expected = new StringBuilder(128);
- for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
- if (0 < expected.length()) {
- expected.append(COMMA);
- }
-
- expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
- if (column.getSecond() != null)
- expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
- }
-
- if (!expected.toString().equals(columnValues)) {
- continue;
- }
- } else if (null != columnValues){
- continue;
- }
-
- Range expected = new Range(start, end);
- String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
- if (null != serializedRanges) {
- try {
- // We currently only support serializing one Range into the Configuration from this Storage class
- ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
- Range range = new Range();
- range.readFields(new DataInputStream(bais));
-
- if (!expected.equals(range)) {
- continue;
- }
- } catch (IOException e) {
- // Got an exception, they don't match
- continue;
- }
- }
-
-
- // We found a group of entries in the config which are (similar to) what
- // we would have set.
- return false;
- }
-
- // We didn't find any entries that seemed to match, write the config
- return true;
+// entries = AccumuloOutputFormat.getRelevantEntries(conf);
+// System.out.println(entries);
}
- private boolean shouldSetOutput(Map<String,String> configEntries) {
- Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-
- for (Map<String,String> group : groupedConfigEntries.values()) {
- if (null != inst) {
- if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
- continue;
- }
- } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
- continue;
- }
-
- if (null != zookeepers) {
- if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
- continue;
- }
- } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
- continue;
- }
-
- if (null != user) {
- if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
- continue;
- }
- } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
- continue;
- }
-
- if (null != password) {
- if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) {
- continue;
- }
- } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
- continue;
- }
-
- if (null != table) {
- if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
- continue;
- }
- } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
- continue;
- }
-
- String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
- try {
- if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) {
- continue;
- }
- } catch (NumberFormatException e) {
- // Wasn't a number, so it can't match what we were expecting
- continue;
- }
-
- String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
- try {
- if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) {
- continue;
- }
- } catch (NumberFormatException e) {
- // Wasn't a number, so it can't match what we were expecting
- continue;
- }
-
- String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
- try {
- if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
- continue;
- }
- } catch (NumberFormatException e) {
- // Wasn't a number, so it can't match what we were expecting
- continue;
- }
-
- // We found a group of entries in the config which are (similar to) what
- // we would have set.
- return false;
- }
-
- // We didn't find any entries that seemed to match, write the config
- return true;
- }
-
- private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) {
- Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
- for (Entry<String,String> entry : entries.entrySet()) {
- final String key = entry.getKey(), value = entry.getValue();
-
- if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
- continue;
- }
-
- int index = key.lastIndexOf(PERIOD);
- if (-1 != index) {
- int group = Integer.parseInt(key.substring(index + 1));
- String name = key.substring(0, index);
-
- Map<String,String> entriesInGroup = groupedEntries.get(group);
- if (null == entriesInGroup) {
- entriesInGroup = new HashMap<String,String>();
- groupedEntries.put(group, entriesInGroup);
- }
-
- entriesInGroup.put(name, value);
- } else {
- LOG.warn("Could not parse key: " + key);
- }
- }
-
- return groupedEntries;
- }
+// private boolean shouldSetInput(Map<String,String> configEntries) {
+// Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
+//
+// for (Map<String,String> group : groupedConfigEntries.values()) {
+// if (null != inst) {
+// if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
+// continue;
+// }
+// } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
+// continue;
+// }
+//
+// if (null != zookeepers) {
+// if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
+// continue;
+// }
+// } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
+// continue;
+// }
+//
+// if (null != user) {
+// if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
+// continue;
+// }
+// } else if (null != group.get(INPUT_PREFIX + ".username")) {
+// continue;
+// }
+//
+// if (null != password) {
+// if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) {
+// continue;
+// }
+// } else if (null != group.get(INPUT_PREFIX + ".password")) {
+// continue;
+// }
+//
+// if (null != table) {
+// if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
+// continue;
+// }
+// } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
+// continue;
+// }
+//
+// if (null != authorizations) {
+// if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) {
+// continue;
+// }
+// } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
+// continue;
+// }
+//
+// String columnValues = group.get(INPUT_PREFIX + ".columns");
+// if (null != columnFamilyColumnQualifierPairs) {
+// StringBuilder expected = new StringBuilder(128);
+// for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+// if (0 < expected.length()) {
+// expected.append(COMMA);
+// }
+//
+// expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
+// if (column.getSecond() != null)
+// expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
+// }
+//
+// if (!expected.toString().equals(columnValues)) {
+// continue;
+// }
+// } else if (null != columnValues) {
+// continue;
+// }
+//
+// Range expected = new Range(start, end);
+// String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
+// if (null != serializedRanges) {
+// try {
+// // We currently only support serializing one Range into the Configuration from this Storage class
+// ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
+// Range range = new Range();
+// range.readFields(new DataInputStream(bais));
+//
+// if (!expected.equals(range)) {
+// continue;
+// }
+// } catch (IOException e) {
+// // Got an exception, they don't match
+// continue;
+// }
+// }
+//
+// // We found a group of entries in the config which are (similar to) what
+// // we would have set.
+// return false;
+// }
+//
+// // We didn't find any entries that seemed to match, write the config
+// return true;
+// }
+//
+// private boolean shouldSetOutput(Map<String,String> configEntries) {
+// Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
+//
+// for (Map<String,String> group : groupedConfigEntries.values()) {
+// if (null != inst) {
+// if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
+// continue;
+// }
+// } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
+// continue;
+// }
+//
+// if (null != zookeepers) {
+// if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
+// continue;
+// }
+// } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
+// continue;
+// }
+//
+// if (null != user) {
+// if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
+// continue;
+// }
+// } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
+// continue;
+// }
+//
+// if (null != password) {
+// if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) {
+// continue;
+// }
+// } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
+// continue;
+// }
+//
+// if (null != table) {
+// if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
+// continue;
+// }
+// } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
+// continue;
+// }
+//
+// String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
+// try {
+// if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) {
+// continue;
+// }
+// } catch (NumberFormatException e) {
+// // Wasn't a number, so it can't match what we were expecting
+// continue;
+// }
+//
+// String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
+// try {
+// if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) {
+// continue;
+// }
+// } catch (NumberFormatException e) {
+// // Wasn't a number, so it can't match what we were expecting
+// continue;
+// }
+//
+// String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
+// try {
+// if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
+// continue;
+// }
+// } catch (NumberFormatException e) {
+// // Wasn't a number, so it can't match what we were expecting
+// continue;
+// }
+//
+// // We found a group of entries in the config which are (similar to) what
+// // we would have set.
+// return false;
+// }
+//
+// // We didn't find any entries that seemed to match, write the config
+// return true;
+// }
+//
+// private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) {
+// Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
+// for (Entry<String,String> entry : entries.entrySet()) {
+// final String key = entry.getKey(), value = entry.getValue();
+//
+// if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
+// continue;
+// }
+//
+// int index = key.lastIndexOf(PERIOD);
+// if (-1 != index) {
+// int group = Integer.parseInt(key.substring(index + 1));
+// String name = key.substring(0, index);
+//
+// Map<String,String> entriesInGroup = groupedEntries.get(group);
+// if (null == entriesInGroup) {
+// entriesInGroup = new HashMap<String,String>();
+// groupedEntries.put(group, entriesInGroup);
+// }
+//
+// entriesInGroup.put(name, value);
+// } else {
+// LOG.warn("Could not parse key: " + key);
+// }
+// }
+//
+// return groupedEntries;
+// }
@SuppressWarnings("rawtypes")
public OutputFormat getOutputFormat() {
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index bd43dce..8e9cfef 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -73,7 +73,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
@Override
protected Tuple getTuple(Key key, Value value) throws IOException {
-
+// System.out.println(key);
SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
List<Object> tupleEntries = Lists.newLinkedList();
@@ -142,8 +142,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
}
@Override
- protected void configureInputFormat(Configuration conf, int sequence) {
- AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
+ protected void configureInputFormat(Configuration conf) {
+ AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index 499558f..a6db638 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -84,8 +84,8 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
}
@Override
- protected void configureInputFormat(Configuration conf, int sequence) {
- AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
+ protected void configureInputFormat(Configuration conf) {
+ AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index bc886ec..1b5b81a 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -44,11 +44,10 @@ public class AbstractAccumuloStorageTest {
Job expected = new Job();
Configuration expectedConf = expected.getConfiguration();
- final int sequence = AccumuloInputFormat.nextSequence(expectedConf);
- AccumuloInputFormat.setInputInfo(expectedConf, sequence, user, password.getBytes(), table, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(expectedConf, sequence, inst, zookeepers);
- AccumuloInputFormat.fetchColumns(expectedConf, sequence, columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.setRanges(expectedConf, sequence, ranges);
+ AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+ AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.setRanges(expectedConf, ranges);
return expected;
}
@@ -75,12 +74,11 @@ public class AbstractAccumuloStorageTest {
int maxWriteLatencyMS) throws IOException {
Job expected = new Job();
Configuration expectedConf = expected.getConfiguration();
- final int sequence = AccumuloOutputFormat.nextSequence(expectedConf);
- AccumuloOutputFormat.setOutputInfo(expectedConf, sequence,user, password.getBytes(), true, table);
- AccumuloOutputFormat.setZooKeeperInstance(expectedConf, sequence,inst, zookeepers);
- AccumuloOutputFormat.setMaxLatency(expectedConf, sequence,maxWriteLatencyMS);
- AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, sequence,maxWriteBufferSize);
- AccumuloOutputFormat.setMaxWriteThreads(expectedConf, sequence,writeThreads);
+ AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
+ AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+ AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
+ AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
+ AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
return expected;
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index b57c7ba..690d86c 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -56,12 +56,9 @@ public class AccumuloWholeRowStorageTest {
s.setLocation(test.getDefaultLoadLocation(), actual);
Configuration actualConf = actual.getConfiguration();
- // A little brittle. We know this is the sequence number used when we create the DefaultExpectedLoadJob()
- final int sequence = 1;
-
Job expected = test.getDefaultExpectedLoadJob();
Configuration expectedConf = expected.getConfiguration();
- AccumuloInputFormat.addIterator(expectedConf, sequence, new IteratorSetting(50, WholeRowIterator.class));
+ AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(50, WholeRowIterator.class));
TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
}
[2/2] git commit: ACCUMULO-1783 Clean up now dead or unnecessary code.
Posted by el...@apache.org.
ACCUMULO-1783 Clean up now dead or unnecessary code.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/4160c161
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/4160c161
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/4160c161
Branch: refs/heads/ACCUMULO-1783
Commit: 4160c1615010a626beedc318fcaaaef06a258068
Parents: 63d29d4
Author: Josh Elser <el...@apache.org>
Authored: Fri Nov 8 21:27:41 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Nov 8 21:27:41 2013 -0500
----------------------------------------------------------------------
.../accumulo/pig/AbstractAccumuloStorage.java | 398 ++++---------------
1 file changed, 84 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/4160c161/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 5faf6c6..a829d4a 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -73,14 +73,14 @@ import org.joda.time.DateTime;
*/
public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
-
- private static final String COLON = ":", COMMA = ",", PERIOD = ".";
- private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(), OUTPUT_PREFIX = AccumuloOutputFormat.class.getSimpleName();
-
+
+ private static final String COLON = ":", COMMA = ",";
+ private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName();
+
private Configuration conf;
private RecordReader<Key,Value> reader;
private RecordWriter<Text,Mutation> writer;
-
+
String inst;
String zookeepers;
String user;
@@ -90,27 +90,27 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
String auths;
Authorizations authorizations;
List<Pair<Text,Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
-
+
String start = null;
String end = null;
-
+
int maxWriteThreads = 10;
long maxMutationBufferSize = 10 * 1000 * 1000;
int maxLatency = 10 * 1000;
-
+
protected LoadStoreCaster caster;
protected ResourceSchema schema;
protected String contextSignature = null;
-
+
public AbstractAccumuloStorage() {}
-
+
@Override
public Tuple getNext() throws IOException {
try {
// load the next pair
if (!reader.nextKeyValue())
return null;
-
+
Key key = (Key) reader.getCurrentKey();
Value value = (Value) reader.getCurrentValue();
assert key != null && value != null;
@@ -119,21 +119,21 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
throw new IOException(e.getMessage());
}
}
-
+
protected abstract Tuple getTuple(Key key, Value value) throws IOException;
-
+
@Override
@SuppressWarnings("rawtypes")
public InputFormat getInputFormat() {
return new AccumuloInputFormat();
}
-
+
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void prepareToRead(RecordReader reader, PigSplit split) {
this.reader = reader;
}
-
+
private void setLocationFromUri(String location) throws IOException {
// ex:
// accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2&start=abc&end=z
@@ -172,13 +172,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
String[] parts = urlParts[0].split("/+");
table = parts[1];
tableName = new Text(table);
-
+
if (auths == null || auths.equals("")) {
authorizations = new Authorizations();
} else {
authorizations = new Authorizations(auths.split(COMMA));
}
-
+
if (!StringUtils.isEmpty(columns)) {
for (String cfCq : columns.split(COMMA)) {
if (cfCq.contains(COLON)) {
@@ -189,7 +189,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
}
}
}
-
+
} catch (Exception e) {
throw new IOException(
"Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&"
@@ -197,356 +197,126 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
+ e.getMessage());
}
}
-
+
protected RecordWriter<Text,Mutation> getWriter() {
return writer;
}
-
+
protected Map<String,String> getInputFormatEntries(Configuration conf) {
return getEntries(conf, INPUT_PREFIX);
}
-
+
protected Map<String,String> getEntries(Configuration conf, String prefix) {
Map<String,String> entries = new HashMap<String,String>();
-
+
for (Entry<String,String> entry : conf) {
String key = entry.getKey();
if (key.startsWith(prefix)) {
entries.put(key, entry.getValue());
}
}
-
+
return entries;
}
-
-
+
@Override
public void setLocation(String location, Job job) throws IOException {
conf = job.getConfiguration();
setLocationFromUri(location);
-
+
Map<String,String> entries = getInputFormatEntries(conf);
-
- Exception e = new Exception("setLocation");
- e.printStackTrace(System.out);
- System.out.println(entries);
-
+
for (String key : entries.keySet()) {
conf.unset(key);
}
-
- entries = getInputFormatEntries(conf);
- System.out.println(entries);
-
+
AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
if (columnFamilyColumnQualifierPairs.size() > 0) {
LOG.info("columns: " + columnFamilyColumnQualifierPairs);
AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
}
-
+
Collection<Range> ranges = Collections.singleton(new Range(start, end));
-
+
LOG.info("Scanning Accumulo for " + ranges + " for table " + table);
-
+
AccumuloInputFormat.setRanges(conf, ranges);
-
+
configureInputFormat(conf);
-
- entries = getInputFormatEntries(conf);
- System.out.println(entries);
- }
-
- protected void configureInputFormat(Configuration conf) {
-
}
-
- protected void configureOutputFormat(Configuration conf) {
-
- }
-
+
+ /**
+ * Method to allow specific implementations to add more elements to the Configuration for reading data from Accumulo.
+ *
+ * @param conf
+ */
+ protected void configureInputFormat(Configuration conf) {}
+
+ /**
+ * Method to allow specific implementations to add more elements to the Configuration for writing data to Accumulo.
+ *
+ * @param conf
+ */
+ protected void configureOutputFormat(Configuration conf) {}
+
@Override
public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
return location;
}
-
+
@Override
public void setUDFContextSignature(String signature) {
this.contextSignature = signature;
}
-
+
/* StoreFunc methods */
public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
-
+
}
-
+
/**
* Returns UDFProperties based on <code>contextSignature</code>.
*/
protected Properties getUDFProperties() {
return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature});
}
-
+
public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
return relativeToAbsolutePath(location, curDir);
}
-
+
public void setStoreLocation(String location, Job job) throws IOException {
conf = job.getConfiguration();
setLocationFromUri(location);
-//
-// Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
-//
-// Exception e = new Exception("setStoreLocation");
-// e.printStackTrace(System.out);
-// System.out.println(entries);
-//
-// for (String key : entries.keySet()) {
-// conf.unset(key);
-// }
-//
-// entries = AccumuloOutputFormat.getRelevantEntries(conf);
-// System.out.println(entries);
+
+ // TODO If Pig ever uses a MultipleOutputs-esque construct, this approach will fall apart
if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
-
+
LOG.info("Writing data to " + table);
-
+
configureOutputFormat(conf);
}
-
-// entries = AccumuloOutputFormat.getRelevantEntries(conf);
-// System.out.println(entries);
}
-
-// private boolean shouldSetInput(Map<String,String> configEntries) {
-// Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-//
-// for (Map<String,String> group : groupedConfigEntries.values()) {
-// if (null != inst) {
-// if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
-// continue;
-// }
-//
-// if (null != zookeepers) {
-// if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
-// continue;
-// }
-//
-// if (null != user) {
-// if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".username")) {
-// continue;
-// }
-//
-// if (null != password) {
-// if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".password")) {
-// continue;
-// }
-//
-// if (null != table) {
-// if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
-// continue;
-// }
-//
-// if (null != authorizations) {
-// if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
-// continue;
-// }
-//
-// String columnValues = group.get(INPUT_PREFIX + ".columns");
-// if (null != columnFamilyColumnQualifierPairs) {
-// StringBuilder expected = new StringBuilder(128);
-// for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
-// if (0 < expected.length()) {
-// expected.append(COMMA);
-// }
-//
-// expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
-// if (column.getSecond() != null)
-// expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
-// }
-//
-// if (!expected.toString().equals(columnValues)) {
-// continue;
-// }
-// } else if (null != columnValues) {
-// continue;
-// }
-//
-// Range expected = new Range(start, end);
-// String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
-// if (null != serializedRanges) {
-// try {
-// // We currently only support serializing one Range into the Configuration from this Storage class
-// ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
-// Range range = new Range();
-// range.readFields(new DataInputStream(bais));
-//
-// if (!expected.equals(range)) {
-// continue;
-// }
-// } catch (IOException e) {
-// // Got an exception, they don't match
-// continue;
-// }
-// }
-//
-// // We found a group of entries in the config which are (similar to) what
-// // we would have set.
-// return false;
-// }
-//
-// // We didn't find any entries that seemed to match, write the config
-// return true;
-// }
-//
-// private boolean shouldSetOutput(Map<String,String> configEntries) {
-// Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-//
-// for (Map<String,String> group : groupedConfigEntries.values()) {
-// if (null != inst) {
-// if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
-// continue;
-// }
-// } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
-// continue;
-// }
-//
-// if (null != zookeepers) {
-// if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
-// continue;
-// }
-// } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
-// continue;
-// }
-//
-// if (null != user) {
-// if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
-// continue;
-// }
-// } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
-// continue;
-// }
-//
-// if (null != password) {
-// if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) {
-// continue;
-// }
-// } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
-// continue;
-// }
-//
-// if (null != table) {
-// if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
-// continue;
-// }
-// } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
-// continue;
-// }
-//
-// String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
-// try {
-// if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) {
-// continue;
-// }
-// } catch (NumberFormatException e) {
-// // Wasn't a number, so it can't match what we were expecting
-// continue;
-// }
-//
-// String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
-// try {
-// if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) {
-// continue;
-// }
-// } catch (NumberFormatException e) {
-// // Wasn't a number, so it can't match what we were expecting
-// continue;
-// }
-//
-// String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
-// try {
-// if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
-// continue;
-// }
-// } catch (NumberFormatException e) {
-// // Wasn't a number, so it can't match what we were expecting
-// continue;
-// }
-//
-// // We found a group of entries in the config which are (similar to) what
-// // we would have set.
-// return false;
-// }
-//
-// // We didn't find any entries that seemed to match, write the config
-// return true;
-// }
-//
-// private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) {
-// Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
-// for (Entry<String,String> entry : entries.entrySet()) {
-// final String key = entry.getKey(), value = entry.getValue();
-//
-// if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
-// continue;
-// }
-//
-// int index = key.lastIndexOf(PERIOD);
-// if (-1 != index) {
-// int group = Integer.parseInt(key.substring(index + 1));
-// String name = key.substring(0, index);
-//
-// Map<String,String> entriesInGroup = groupedEntries.get(group);
-// if (null == entriesInGroup) {
-// entriesInGroup = new HashMap<String,String>();
-// groupedEntries.put(group, entriesInGroup);
-// }
-//
-// entriesInGroup.put(name, value);
-// } else {
-// LOG.warn("Could not parse key: " + key);
-// }
-// }
-//
-// return groupedEntries;
-// }
-
+
@SuppressWarnings("rawtypes")
public OutputFormat getOutputFormat() {
return new AccumuloOutputFormat();
}
-
+
@SuppressWarnings({"rawtypes", "unchecked"})
public void prepareToWrite(RecordWriter writer) {
this.writer = writer;
}
-
+
public abstract Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException;
-
+
public void putNext(Tuple tuple) throws ExecException, IOException {
Collection<Mutation> muts = getMutations(tuple);
for (Mutation mut : muts) {
@@ -557,11 +327,11 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
}
}
}
-
+
public void cleanupOnFailure(String failure, Job job) {}
-
+
public void cleanupOnSuccess(String location, Job job) {}
-
+
@Override
public void checkSchema(ResourceSchema s) throws IOException {
if (!(caster instanceof LoadStoreCaster)) {
@@ -571,40 +341,40 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
schema = s;
getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
}
-
+
protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
Object o = tuple.get(i);
byte type = schemaToType(o, i, fieldSchemas);
-
+
return objToText(o, type);
}
-
+
protected Text objectToText(Object o, ResourceFieldSchema fieldSchema) throws IOException {
byte type = schemaToType(o, fieldSchema);
-
+
return objToText(o, type);
}
-
+
protected byte schemaToType(Object o, ResourceFieldSchema fieldSchema) {
return (fieldSchema == null) ? DataType.findType(o) : fieldSchema.getType();
}
-
+
protected byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
}
-
+
protected byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
Object o = tuple.get(i);
byte type = schemaToType(o, i, fieldSchemas);
-
+
return objToBytes(o, type);
-
+
}
-
+
protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
Object o = tuple.get(i);
byte type = schemaToType(o, i, fieldSchemas);
-
+
switch (type) {
case DataType.LONG:
return (Long) o;
@@ -629,13 +399,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
case DataType.BIGINTEGER:
BigInteger bigintTimestamp = (BigInteger) o;
long longTimestamp = bigintTimestamp.longValue();
-
+
BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
-
+
if (!recreatedTimestamp.equals(bigintTimestamp)) {
LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
}
-
+
return longTimestamp;
case DataType.BIGDECIMAL:
BigDecimal bigdecimalTimestamp = (BigDecimal) o;
@@ -658,21 +428,21 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
default:
LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
throw new IOException("Could not convert " + o.getClass() + " into long");
-
+
}
}
-
+
protected Text objToText(Object o, byte type) throws IOException {
byte[] bytes = objToBytes(o, type);
-
+
if (null == bytes) {
LOG.warn("Creating empty text from null value");
return new Text();
}
-
+
return new Text(bytes);
}
-
+
@SuppressWarnings("unchecked")
protected byte[] objToBytes(Object o, byte type) throws IOException {
if (o == null)
@@ -700,12 +470,12 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
return caster.toBytes((Boolean) o);
case DataType.DATETIME:
return caster.toBytes((DateTime) o);
-
+
// The type conversion here is unchecked.
// Relying on DataType.findType to do the right thing.
case DataType.MAP:
return caster.toBytes((Map<String,Object>) o);
-
+
case DataType.NULL:
return null;
case DataType.TUPLE: