You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/09 04:37:26 UTC
[2/2] git commit: ACCUMULO-1783 Clean up now dead or unnecessary code.
ACCUMULO-1783 Clean up now dead or unnecessary code.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/4160c161
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/4160c161
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/4160c161
Branch: refs/heads/ACCUMULO-1783
Commit: 4160c1615010a626beedc318fcaaaef06a258068
Parents: 63d29d4
Author: Josh Elser <el...@apache.org>
Authored: Fri Nov 8 21:27:41 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Nov 8 21:27:41 2013 -0500
----------------------------------------------------------------------
.../accumulo/pig/AbstractAccumuloStorage.java | 398 ++++---------------
1 file changed, 84 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/4160c161/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 5faf6c6..a829d4a 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -73,14 +73,14 @@ import org.joda.time.DateTime;
*/
public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
-
- private static final String COLON = ":", COMMA = ",", PERIOD = ".";
- private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(), OUTPUT_PREFIX = AccumuloOutputFormat.class.getSimpleName();
-
+
+ private static final String COLON = ":", COMMA = ",";
+ private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName();
+
private Configuration conf;
private RecordReader<Key,Value> reader;
private RecordWriter<Text,Mutation> writer;
-
+
String inst;
String zookeepers;
String user;
@@ -90,27 +90,27 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
String auths;
Authorizations authorizations;
List<Pair<Text,Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
-
+
String start = null;
String end = null;
-
+
int maxWriteThreads = 10;
long maxMutationBufferSize = 10 * 1000 * 1000;
int maxLatency = 10 * 1000;
-
+
protected LoadStoreCaster caster;
protected ResourceSchema schema;
protected String contextSignature = null;
-
+
public AbstractAccumuloStorage() {}
-
+
@Override
public Tuple getNext() throws IOException {
try {
// load the next pair
if (!reader.nextKeyValue())
return null;
-
+
Key key = (Key) reader.getCurrentKey();
Value value = (Value) reader.getCurrentValue();
assert key != null && value != null;
@@ -119,21 +119,21 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
throw new IOException(e.getMessage());
}
}
-
+
protected abstract Tuple getTuple(Key key, Value value) throws IOException;
-
+
@Override
@SuppressWarnings("rawtypes")
public InputFormat getInputFormat() {
return new AccumuloInputFormat();
}
-
+
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void prepareToRead(RecordReader reader, PigSplit split) {
this.reader = reader;
}
-
+
private void setLocationFromUri(String location) throws IOException {
// ex:
// accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2&start=abc&end=z
@@ -172,13 +172,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
String[] parts = urlParts[0].split("/+");
table = parts[1];
tableName = new Text(table);
-
+
if (auths == null || auths.equals("")) {
authorizations = new Authorizations();
} else {
authorizations = new Authorizations(auths.split(COMMA));
}
-
+
if (!StringUtils.isEmpty(columns)) {
for (String cfCq : columns.split(COMMA)) {
if (cfCq.contains(COLON)) {
@@ -189,7 +189,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
}
}
}
-
+
} catch (Exception e) {
throw new IOException(
"Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&"
@@ -197,356 +197,126 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
+ e.getMessage());
}
}
-
+
protected RecordWriter<Text,Mutation> getWriter() {
return writer;
}
-
+
protected Map<String,String> getInputFormatEntries(Configuration conf) {
return getEntries(conf, INPUT_PREFIX);
}
-
+
protected Map<String,String> getEntries(Configuration conf, String prefix) {
Map<String,String> entries = new HashMap<String,String>();
-
+
for (Entry<String,String> entry : conf) {
String key = entry.getKey();
if (key.startsWith(prefix)) {
entries.put(key, entry.getValue());
}
}
-
+
return entries;
}
-
-
+
@Override
public void setLocation(String location, Job job) throws IOException {
conf = job.getConfiguration();
setLocationFromUri(location);
-
+
Map<String,String> entries = getInputFormatEntries(conf);
-
- Exception e = new Exception("setLocation");
- e.printStackTrace(System.out);
- System.out.println(entries);
-
+
for (String key : entries.keySet()) {
conf.unset(key);
}
-
- entries = getInputFormatEntries(conf);
- System.out.println(entries);
-
+
AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
if (columnFamilyColumnQualifierPairs.size() > 0) {
LOG.info("columns: " + columnFamilyColumnQualifierPairs);
AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
}
-
+
Collection<Range> ranges = Collections.singleton(new Range(start, end));
-
+
LOG.info("Scanning Accumulo for " + ranges + " for table " + table);
-
+
AccumuloInputFormat.setRanges(conf, ranges);
-
+
configureInputFormat(conf);
-
- entries = getInputFormatEntries(conf);
- System.out.println(entries);
- }
-
- protected void configureInputFormat(Configuration conf) {
-
}
-
- protected void configureOutputFormat(Configuration conf) {
-
- }
-
+
+ /**
+ * Method to allow specific implementations to add more elements to the Configuration for reading data from Accumulo.
+ *
+ * @param conf
+ */
+ protected void configureInputFormat(Configuration conf) {}
+
+ /**
+ * Method to allow specific implementations to add more elements to the Configuration for writing data to Accumulo.
+ *
+ * @param conf
+ */
+ protected void configureOutputFormat(Configuration conf) {}
+
@Override
public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
return location;
}
-
+
@Override
public void setUDFContextSignature(String signature) {
this.contextSignature = signature;
}
-
+
/* StoreFunc methods */
public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
-
+
}
-
+
/**
* Returns UDFProperties based on <code>contextSignature</code>.
*/
protected Properties getUDFProperties() {
return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature});
}
-
+
public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
return relativeToAbsolutePath(location, curDir);
}
-
+
public void setStoreLocation(String location, Job job) throws IOException {
conf = job.getConfiguration();
setLocationFromUri(location);
-//
-// Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
-//
-// Exception e = new Exception("setStoreLocation");
-// e.printStackTrace(System.out);
-// System.out.println(entries);
-//
-// for (String key : entries.keySet()) {
-// conf.unset(key);
-// }
-//
-// entries = AccumuloOutputFormat.getRelevantEntries(conf);
-// System.out.println(entries);
+
+ // TODO If Pig ever uses a MultipleOutputs-esque construct, this approach will fall apart
if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
-
+
LOG.info("Writing data to " + table);
-
+
configureOutputFormat(conf);
}
-
-// entries = AccumuloOutputFormat.getRelevantEntries(conf);
-// System.out.println(entries);
}
-
-// private boolean shouldSetInput(Map<String,String> configEntries) {
-// Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-//
-// for (Map<String,String> group : groupedConfigEntries.values()) {
-// if (null != inst) {
-// if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
-// continue;
-// }
-//
-// if (null != zookeepers) {
-// if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
-// continue;
-// }
-//
-// if (null != user) {
-// if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".username")) {
-// continue;
-// }
-//
-// if (null != password) {
-// if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".password")) {
-// continue;
-// }
-//
-// if (null != table) {
-// if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
-// continue;
-// }
-//
-// if (null != authorizations) {
-// if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) {
-// continue;
-// }
-// } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
-// continue;
-// }
-//
-// String columnValues = group.get(INPUT_PREFIX + ".columns");
-// if (null != columnFamilyColumnQualifierPairs) {
-// StringBuilder expected = new StringBuilder(128);
-// for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
-// if (0 < expected.length()) {
-// expected.append(COMMA);
-// }
-//
-// expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
-// if (column.getSecond() != null)
-// expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
-// }
-//
-// if (!expected.toString().equals(columnValues)) {
-// continue;
-// }
-// } else if (null != columnValues) {
-// continue;
-// }
-//
-// Range expected = new Range(start, end);
-// String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
-// if (null != serializedRanges) {
-// try {
-// // We currently only support serializing one Range into the Configuration from this Storage class
-// ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
-// Range range = new Range();
-// range.readFields(new DataInputStream(bais));
-//
-// if (!expected.equals(range)) {
-// continue;
-// }
-// } catch (IOException e) {
-// // Got an exception, they don't match
-// continue;
-// }
-// }
-//
-// // We found a group of entries in the config which are (similar to) what
-// // we would have set.
-// return false;
-// }
-//
-// // We didn't find any entries that seemed to match, write the config
-// return true;
-// }
-//
-// private boolean shouldSetOutput(Map<String,String> configEntries) {
-// Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-//
-// for (Map<String,String> group : groupedConfigEntries.values()) {
-// if (null != inst) {
-// if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
-// continue;
-// }
-// } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
-// continue;
-// }
-//
-// if (null != zookeepers) {
-// if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
-// continue;
-// }
-// } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
-// continue;
-// }
-//
-// if (null != user) {
-// if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
-// continue;
-// }
-// } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
-// continue;
-// }
-//
-// if (null != password) {
-// if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) {
-// continue;
-// }
-// } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
-// continue;
-// }
-//
-// if (null != table) {
-// if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
-// continue;
-// }
-// } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
-// continue;
-// }
-//
-// String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
-// try {
-// if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) {
-// continue;
-// }
-// } catch (NumberFormatException e) {
-// // Wasn't a number, so it can't match what we were expecting
-// continue;
-// }
-//
-// String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
-// try {
-// if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) {
-// continue;
-// }
-// } catch (NumberFormatException e) {
-// // Wasn't a number, so it can't match what we were expecting
-// continue;
-// }
-//
-// String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
-// try {
-// if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
-// continue;
-// }
-// } catch (NumberFormatException e) {
-// // Wasn't a number, so it can't match what we were expecting
-// continue;
-// }
-//
-// // We found a group of entries in the config which are (similar to) what
-// // we would have set.
-// return false;
-// }
-//
-// // We didn't find any entries that seemed to match, write the config
-// return true;
-// }
-//
-// private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) {
-// Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
-// for (Entry<String,String> entry : entries.entrySet()) {
-// final String key = entry.getKey(), value = entry.getValue();
-//
-// if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
-// continue;
-// }
-//
-// int index = key.lastIndexOf(PERIOD);
-// if (-1 != index) {
-// int group = Integer.parseInt(key.substring(index + 1));
-// String name = key.substring(0, index);
-//
-// Map<String,String> entriesInGroup = groupedEntries.get(group);
-// if (null == entriesInGroup) {
-// entriesInGroup = new HashMap<String,String>();
-// groupedEntries.put(group, entriesInGroup);
-// }
-//
-// entriesInGroup.put(name, value);
-// } else {
-// LOG.warn("Could not parse key: " + key);
-// }
-// }
-//
-// return groupedEntries;
-// }
-
+
@SuppressWarnings("rawtypes")
public OutputFormat getOutputFormat() {
return new AccumuloOutputFormat();
}
-
+
@SuppressWarnings({"rawtypes", "unchecked"})
public void prepareToWrite(RecordWriter writer) {
this.writer = writer;
}
-
+
public abstract Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException;
-
+
public void putNext(Tuple tuple) throws ExecException, IOException {
Collection<Mutation> muts = getMutations(tuple);
for (Mutation mut : muts) {
@@ -557,11 +327,11 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
}
}
}
-
+
public void cleanupOnFailure(String failure, Job job) {}
-
+
public void cleanupOnSuccess(String location, Job job) {}
-
+
@Override
public void checkSchema(ResourceSchema s) throws IOException {
if (!(caster instanceof LoadStoreCaster)) {
@@ -571,40 +341,40 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
schema = s;
getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
}
-
+
protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
Object o = tuple.get(i);
byte type = schemaToType(o, i, fieldSchemas);
-
+
return objToText(o, type);
}
-
+
protected Text objectToText(Object o, ResourceFieldSchema fieldSchema) throws IOException {
byte type = schemaToType(o, fieldSchema);
-
+
return objToText(o, type);
}
-
+
protected byte schemaToType(Object o, ResourceFieldSchema fieldSchema) {
return (fieldSchema == null) ? DataType.findType(o) : fieldSchema.getType();
}
-
+
protected byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
}
-
+
protected byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
Object o = tuple.get(i);
byte type = schemaToType(o, i, fieldSchemas);
-
+
return objToBytes(o, type);
-
+
}
-
+
protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException {
Object o = tuple.get(i);
byte type = schemaToType(o, i, fieldSchemas);
-
+
switch (type) {
case DataType.LONG:
return (Long) o;
@@ -629,13 +399,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
case DataType.BIGINTEGER:
BigInteger bigintTimestamp = (BigInteger) o;
long longTimestamp = bigintTimestamp.longValue();
-
+
BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
-
+
if (!recreatedTimestamp.equals(bigintTimestamp)) {
LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
}
-
+
return longTimestamp;
case DataType.BIGDECIMAL:
BigDecimal bigdecimalTimestamp = (BigDecimal) o;
@@ -658,21 +428,21 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
default:
LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
throw new IOException("Could not convert " + o.getClass() + " into long");
-
+
}
}
-
+
protected Text objToText(Object o, byte type) throws IOException {
byte[] bytes = objToBytes(o, type);
-
+
if (null == bytes) {
LOG.warn("Creating empty text from null value");
return new Text();
}
-
+
return new Text(bytes);
}
-
+
@SuppressWarnings("unchecked")
protected byte[] objToBytes(Object o, byte type) throws IOException {
if (o == null)
@@ -700,12 +470,12 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
return caster.toBytes((Boolean) o);
case DataType.DATETIME:
return caster.toBytes((DateTime) o);
-
+
// The type conversion here is unchecked.
// Relying on DataType.findType to do the right thing.
case DataType.MAP:
return caster.toBytes((Map<String,Object>) o);
-
+
case DataType.NULL:
return null;
case DataType.TUPLE: