You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/19 01:53:18 UTC
[11/13] git commit: ACCUMULO-1783 Update accumulo dependency to 1.5.0
and reformat code
ACCUMULO-1783 Update accumulo dependency to 1.5.0 and reformat code
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/6d494666
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/6d494666
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/6d494666
Branch: refs/heads/1.5
Commit: 6d4946661b722205b824bd72e174cdc10ecdd165
Parents: 9d9c5fa
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 17 15:27:59 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 17 15:27:59 2013 -0400
----------------------------------------------------------------------
pom.xml | 101 ++--
.../accumulo/pig/AbstractAccumuloStorage.java | 472 +++++++++----------
.../apache/accumulo/pig/AccumuloStorage.java | 107 ++---
.../accumulo/pig/AccumuloWholeRowStorage.java | 122 +++--
.../java/org/apache/accumulo/pig/Utils.java | 68 ++-
.../pig/AbstractAccumuloStorageTest.java | 229 +++++----
.../accumulo/pig/AccumuloStorageTest.java | 139 +++---
.../pig/AccumuloWholeRowStorageTest.java | 218 ++++-----
.../java/org/apache/accumulo/pig/TestUtils.java | 88 ++--
9 files changed, 718 insertions(+), 826 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0b36bbf..097d8de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,20 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
@@ -32,62 +26,49 @@
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.1.1</version>
</dependency>
-
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.11.0</version>
</dependency>
-
<!-- Needed by Apache Pig 0.11.0 -->
<dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- <version>1.6</version>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>1.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>1.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.9.0</version>
</dependency>
-
- <dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-core</artifactId>
- <version>1.5.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.3.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.0</version>
- </dependency>
-<!--
- <dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>cloudtrace</artifactId>
- <version>1.4.0</version>
- </dependency>
--->
-
-
</dependencies>
-<build>
- <plugins>
-<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
-</plugin>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
@@ -97,7 +78,7 @@
</descriptorRefs>
<archive>
<manifest>
- <mainClass/>
+ <mainClass />
</manifest>
</archive>
</configuration>
@@ -113,8 +94,8 @@
</plugin>
- </plugins>
-</build>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 82329d3..9612d15 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -54,270 +54,236 @@ import org.apache.pig.data.Tuple;
/**
* A LoadStoreFunc for retrieving data from and storing data to Accumulo
- *
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long.
*
- * Tuples can be written in 2 forms:
- * (key, colfam, colqual, colvis, value)
- * OR
- * (key, colfam, colqual, value)
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ *
+ * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
*
*/
-public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface
-{
- private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
-
- private Configuration conf;
- private RecordReader<Key, Value> reader;
- private RecordWriter<Text, Mutation> writer;
-
- String inst;
- String zookeepers;
- String user;
- String password;
- String table;
- Text tableName;
- String auths;
- Authorizations authorizations;
- List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
-
- String start = null;
- String end = null;
-
- int maxWriteThreads = 10;
- long maxMutationBufferSize = 10*1000*1000;
- int maxLatency = 10*1000;
-
- public AbstractAccumuloStorage(){}
-
- @Override
- public Tuple getNext() throws IOException
- {
- try
- {
- // load the next pair
- if (!reader.nextKeyValue())
- return null;
-
- Key key = (Key)reader.getCurrentKey();
- Value value = (Value)reader.getCurrentValue();
- assert key != null && value != null;
- return getTuple(key, value);
- }
- catch (InterruptedException e)
- {
- throw new IOException(e.getMessage());
- }
- }
-
- protected abstract Tuple getTuple(Key key, Value value) throws IOException;
-
-
- @Override
- public InputFormat getInputFormat()
- {
- return new AccumuloInputFormat();
+public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
+ private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
+
+ private Configuration conf;
+ private RecordReader<Key,Value> reader;
+ private RecordWriter<Text,Mutation> writer;
+
+ String inst;
+ String zookeepers;
+ String user;
+ String password;
+ String table;
+ Text tableName;
+ String auths;
+ Authorizations authorizations;
+ List<Pair<Text,Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
+
+ String start = null;
+ String end = null;
+
+ int maxWriteThreads = 10;
+ long maxMutationBufferSize = 10 * 1000 * 1000;
+ int maxLatency = 10 * 1000;
+
+ public AbstractAccumuloStorage() {}
+
+ @Override
+ public Tuple getNext() throws IOException {
+ try {
+ // load the next pair
+ if (!reader.nextKeyValue())
+ return null;
+
+ Key key = (Key) reader.getCurrentKey();
+ Value value = (Value) reader.getCurrentValue();
+ assert key != null && value != null;
+ return getTuple(key, value);
+ } catch (InterruptedException e) {
+ throw new IOException(e.getMessage());
}
-
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split)
- {
- this.reader = reader;
- }
-
- private void setLocationFromUri(String location) throws IOException
- {
- // ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&start=abc&end=z
- String columns = "";
- try
- {
- if (!location.startsWith("accumulo://"))
- throw new Exception("Bad scheme.");
- String[] urlParts = location.split("\\?");
- if (urlParts.length > 1)
- {
- for (String param : urlParts[1].split("&"))
- {
- String[] pair = param.split("=");
- if (pair[0].equals("instance"))
- inst = pair[1];
- else if (pair[0].equals("user"))
- user = pair[1];
- else if (pair[0].equals("password"))
- password = pair[1];
- else if (pair[0].equals("zookeepers"))
- zookeepers = pair[1];
- else if (pair[0].equals("auths"))
- auths = pair[1];
- else if (pair[0].equals("columns"))
- columns = pair[1];
- else if (pair[0].equals("start"))
- start = pair[1];
- else if (pair[0].equals("end"))
- end = pair[1];
- else if (pair[0].equals("write_buffer_size_bytes"))
- maxMutationBufferSize = Long.parseLong(pair[1]);
- else if (pair[0].equals("write_threads"))
- maxWriteThreads = Integer.parseInt(pair[1]);
- else if (pair[0].equals("write_latency_ms"))
- maxLatency = Integer.parseInt(pair[1]);
- }
- }
- String[] parts = urlParts[0].split("/+");
- table = parts[1];
- tableName = new Text(table);
-
- if(auths == null || auths.equals(""))
- {
- authorizations = new Authorizations();
- }
- else
- {
- authorizations = new Authorizations(auths.split(","));
- }
-
- if(!columns.equals("")){
- for(String cfCq : columns.split(","))
- {
- if(cfCq.contains("|"))
- {
- String[] c = cfCq.split("\\|");
- columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(c[0]), new Text(c[1])));
- }
- else
- {
- columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(cfCq), null));
- }
- }
- }
-
+ }
+
+ protected abstract Tuple getTuple(Key key, Value value) throws IOException;
+
+ @Override
+ public InputFormat getInputFormat() {
+ return new AccumuloInputFormat();
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) {
+ this.reader = reader;
+ }
+
+ private void setLocationFromUri(String location) throws IOException {
+ // ex:
+ // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&start=abc&end=z
+ String columns = "";
+ try {
+ if (!location.startsWith("accumulo://"))
+ throw new Exception("Bad scheme.");
+ String[] urlParts = location.split("\\?");
+ if (urlParts.length > 1) {
+ for (String param : urlParts[1].split("&")) {
+ String[] pair = param.split("=");
+ if (pair[0].equals("instance"))
+ inst = pair[1];
+ else if (pair[0].equals("user"))
+ user = pair[1];
+ else if (pair[0].equals("password"))
+ password = pair[1];
+ else if (pair[0].equals("zookeepers"))
+ zookeepers = pair[1];
+ else if (pair[0].equals("auths"))
+ auths = pair[1];
+ else if (pair[0].equals("columns"))
+ columns = pair[1];
+ else if (pair[0].equals("start"))
+ start = pair[1];
+ else if (pair[0].equals("end"))
+ end = pair[1];
+ else if (pair[0].equals("write_buffer_size_bytes"))
+ maxMutationBufferSize = Long.parseLong(pair[1]);
+ else if (pair[0].equals("write_threads"))
+ maxWriteThreads = Integer.parseInt(pair[1]);
+ else if (pair[0].equals("write_latency_ms"))
+ maxLatency = Integer.parseInt(pair[1]);
}
- catch (Exception e)
- {
- throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&" +
- "[start=startRow,end=endRow,columns=[cf1|cq1,cf2|cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': " + e.getMessage());
+ }
+ String[] parts = urlParts[0].split("/+");
+ table = parts[1];
+ tableName = new Text(table);
+
+ if (auths == null || auths.equals("")) {
+ authorizations = new Authorizations();
+ } else {
+ authorizations = new Authorizations(auths.split(","));
+ }
+
+ if (!columns.equals("")) {
+ for (String cfCq : columns.split(",")) {
+ if (cfCq.contains("|")) {
+ String[] c = cfCq.split("\\|");
+ columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(c[0]), new Text(c[1])));
+ } else {
+ columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(cfCq), null));
+ }
}
+ }
+
+ } catch (Exception e) {
+ throw new IOException(
+ "Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&"
+ + "[start=startRow,end=endRow,columns=[cf1|cq1,cf2|cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': "
+ + e.getMessage());
}
+ }
+
+ protected RecordWriter<Text,Mutation> getWriter() {
+ return writer;
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ conf = job.getConfiguration();
+ setLocationFromUri(location);
- protected RecordWriter<Text, Mutation> getWriter() {
- return writer;
- }
-
- @Override
- public void setLocation(String location, Job job) throws IOException
- {
- conf = job.getConfiguration();
- setLocationFromUri(location);
-
- if(!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf))
- {
- AccumuloInputFormat.setInputTableName(job, table);
- AccumuloInputFormat.setScanAuthorizations(job, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
-
- try {
- AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
- } catch (AccumuloSecurityException e) {
- throw new IOException(e);
- }
-
- if(columnFamilyColumnQualifierPairs.size() > 0)
- {
- LOG.info("columns: "+columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
- }
-
- AccumuloInputFormat.setRanges(job, Collections.singleton(new Range(start, end)));
- configureInputFormat(conf);
- }
+ if (!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf)) {
+ AccumuloInputFormat.setInputTableName(job, table);
+ AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
+
+ try {
+ AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+
+ if (columnFamilyColumnQualifierPairs.size() > 0) {
+ LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
+ }
+
+ AccumuloInputFormat.setRanges(job, Collections.singleton(new Range(start, end)));
+ configureInputFormat(conf);
}
-
- protected void configureInputFormat(Configuration conf){}
-
- protected void configureOutputFormat(Configuration conf){}
+ }
+
+ protected void configureInputFormat(Configuration conf) {}
+
+ protected void configureOutputFormat(Configuration conf) {}
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+ return location;
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature) {}
+
+ /* StoreFunc methods */
+ public void setStoreFuncUDFContextSignature(String signature) {}
+
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+ return relativeToAbsolutePath(location, curDir);
+ }
+
+ public void setStoreLocation(String location, Job job) throws IOException {
+ conf = job.getConfiguration();
+ setLocationFromUri(location);
- @Override
- public String relativeToAbsolutePath(String location, Path curDir) throws IOException
- {
- return location;
- }
-
- @Override
- public void setUDFContextSignature(String signature){}
-
- /* StoreFunc methods */
- public void setStoreFuncUDFContextSignature(String signature){}
-
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
- {
- return relativeToAbsolutePath(location, curDir);
- }
-
- public void setStoreLocation(String location, Job job) throws IOException
- {
- conf = job.getConfiguration();
- setLocationFromUri(location);
-
- try
- {
- if(!ConfiguratorBase.isConnectorInfoSet(AccumuloOutputFormat.class, conf))
- {
- BatchWriterConfig bwConfig = new BatchWriterConfig();
- bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
- bwConfig.setMaxMemory(maxMutationBufferSize);
- bwConfig.setMaxWriteThreads(maxWriteThreads);
- AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
-
- AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
- AccumuloOutputFormat.setDefaultTableName(job, table);
- AccumuloOutputFormat.setCreateTables(job, true);
-
- try {
- AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
- } catch (AccumuloSecurityException e) {
- throw new IOException(e);
- }
-
- configureOutputFormat(conf);
- }
- }
- catch(java.lang.IllegalStateException e1){
- e1.printStackTrace();
+ try {
+ if (!ConfiguratorBase.isConnectorInfoSet(AccumuloOutputFormat.class, conf)) {
+ BatchWriterConfig bwConfig = new BatchWriterConfig();
+ bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+ bwConfig.setMaxMemory(maxMutationBufferSize);
+ bwConfig.setMaxWriteThreads(maxWriteThreads);
+ AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+
+ AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
+ AccumuloOutputFormat.setDefaultTableName(job, table);
+ AccumuloOutputFormat.setCreateTables(job, true);
+
+ try {
+ AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
}
+
+ configureOutputFormat(conf);
+ }
+ } catch (java.lang.IllegalStateException e1) {
+ e1.printStackTrace();
}
-
- public OutputFormat getOutputFormat()
- {
- return new AccumuloOutputFormat();
- }
-
- public void checkSchema(ResourceSchema schema) throws IOException
- {
- // we don't care about types, they all get casted to ByteBuffers
- }
-
- public void prepareToWrite(RecordWriter writer)
- {
- this.writer = writer;
- }
-
- public abstract Collection<Mutation> getMutations(Tuple tuple)throws ExecException, IOException;
-
- public void putNext(Tuple tuple) throws ExecException, IOException
- {
- Collection<Mutation> muts = getMutations(tuple);
- for(Mutation mut : muts)
- {
- try {
- getWriter().write(tableName, mut);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
+ }
+
+ public OutputFormat getOutputFormat() {
+ return new AccumuloOutputFormat();
+ }
+
+ public void checkSchema(ResourceSchema schema) throws IOException {
+ // we don't care about types, they all get casted to ByteBuffers
+ }
+
+ public void prepareToWrite(RecordWriter writer) {
+ this.writer = writer;
+ }
+
+ public abstract Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException;
+
+ public void putNext(Tuple tuple) throws ExecException, IOException {
+ Collection<Mutation> muts = getMutations(tuple);
+ for (Mutation mut : muts) {
+ try {
+ getWriter().write(tableName, mut);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
}
-
- public void cleanupOnFailure(String failure, Job job){}
-
- @Override
- public void cleanupOnSuccess(String location, Job job) throws IOException {}
+ }
+
+ public void cleanupOnFailure(String failure, Job job) {}
+
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {}
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 5b146d6..4de1618 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -34,65 +34,56 @@ import org.apache.pig.data.TupleFactory;
/**
* A LoadStoreFunc for retrieving data from and storing data to Accumulo
- *
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long.
*
- * Tuples can be written in 2 forms:
- * (key, colfam, colqual, colvis, value)
- * OR
- * (key, colfam, colqual, value)
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ *
+ * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
*
*/
-public class AccumuloStorage extends AbstractAccumuloStorage
-{
- private static final Log LOG = LogFactory.getLog(AccumuloStorage.class);
-
- public AccumuloStorage(){}
-
- @Override
- protected Tuple getTuple(Key key, Value value) throws IOException {
- // and wrap it in a tuple
- Tuple tuple = TupleFactory.getInstance().newTuple(6);
- tuple.set(0, new DataByteArray(key.getRow().getBytes()));
- tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
- tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
- tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
- tuple.set(4, new Long(key.getTimestamp()));
- tuple.set(5, new DataByteArray(value.get()));
- return tuple;
- }
-
- @Override
- public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
-
- try {
- Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
- Text cf = Utils.objToText(tuple.get(1));
- Text cq = Utils.objToText(tuple.get(2));
-
- if(tuple.size() > 4)
- {
- Text cv = Utils.objToText(tuple.get(3));
- Value val = new Value(Utils.objToBytes(tuple.get(4)));
- if(cv.getLength() == 0)
- {
- mut.put(cf, cq, val);
- }
- else
- {
- mut.put(cf, cq, new ColumnVisibility(cv), val);
- }
- }
- else
- {
- Value val = new Value(Utils.objToBytes(tuple.get(3)));
- mut.put(cf, cq, val);
- }
-
- return Collections.singleton(mut);
- } catch (IOException e) {
- System.err.println("Error on Tuple: "+tuple);
- throw e;
- }
- }
+public class AccumuloStorage extends AbstractAccumuloStorage {
+ private static final Log LOG = LogFactory.getLog(AccumuloStorage.class);
+
+ public AccumuloStorage() {}
+
+ @Override
+ protected Tuple getTuple(Key key, Value value) throws IOException {
+ // and wrap it in a tuple
+ Tuple tuple = TupleFactory.getInstance().newTuple(6);
+ tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+ tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
+ tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
+ tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
+ tuple.set(4, new Long(key.getTimestamp()));
+ tuple.set(5, new DataByteArray(value.get()));
+ return tuple;
+ }
+
+ @Override
+ public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+
+ try {
+ Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
+ Text cf = Utils.objToText(tuple.get(1));
+ Text cq = Utils.objToText(tuple.get(2));
+
+ if (tuple.size() > 4) {
+ Text cv = Utils.objToText(tuple.get(3));
+ Value val = new Value(Utils.objToBytes(tuple.get(4)));
+ if (cv.getLength() == 0) {
+ mut.put(cf, cq, val);
+ } else {
+ mut.put(cf, cq, new ColumnVisibility(cv), val);
+ }
+ } else {
+ Value val = new Value(Utils.objToBytes(tuple.get(3)));
+ mut.put(cf, cq, val);
+ }
+
+ return Collections.singleton(mut);
+ } catch (IOException e) {
+ System.err.println("Error on Tuple: " + tuple);
+ throw e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index a959dda..fcfd55e 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -43,77 +43,65 @@ import org.apache.pig.data.TupleFactory;
/**
* A LoadStoreFunc for retrieving data from and storing data to Accumulo
- *
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long.
*
- * Tuples can be written in 2 forms:
- * (key, colfam, colqual, colvis, value)
- * OR
- * (key, colfam, colqual, value)
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ *
+ * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
*
*/
-public class AccumuloWholeRowStorage extends AbstractAccumuloStorage
-{
- private static final Log LOG = LogFactory.getLog(AccumuloWholeRowStorage.class);
-
- public AccumuloWholeRowStorage(){}
-
- @Override
- protected Tuple getTuple(Key key, Value value) throws IOException {
-
- SortedMap<Key, Value> rowKVs = WholeRowIterator.decodeRow(key, value);
- List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
- for(Entry<Key, Value> e : rowKVs.entrySet())
- {
- columns.add(columnToTuple(
- e.getKey().getColumnFamily(),
- e.getKey().getColumnQualifier(),
- e.getKey().getColumnVisibility(),
- e.getKey().getTimestamp(),
- e.getValue())
- );
- }
-
- // and wrap it in a tuple
- Tuple tuple = TupleFactory.getInstance().newTuple(2);
- tuple.set(0, new DataByteArray(key.getRow().getBytes()));
- tuple.set(1, new DefaultDataBag(columns));
-
- return tuple;
- }
-
- private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException
- {
- Tuple tuple = TupleFactory.getInstance().newTuple(5);
- tuple.set(0, new DataByteArray(colfam.getBytes()));
- tuple.set(1, new DataByteArray(colqual.getBytes()));
- tuple.set(2, new DataByteArray(colvis.getBytes()));
- tuple.set(3, new Long(ts));
- tuple.set(4, new DataByteArray(val.get()));
- return tuple;
- }
-
- protected void configureInputFormat(Configuration conf)
- {
- AccumuloInputFormat.addIterator(conf, new IteratorSetting(10, WholeRowIterator.class));
+public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
+ private static final Log LOG = LogFactory.getLog(AccumuloWholeRowStorage.class);
+
+ public AccumuloWholeRowStorage() {}
+
+ @Override
+ protected Tuple getTuple(Key key, Value value) throws IOException {
+
+ SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
+ List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
+ for (Entry<Key,Value> e : rowKVs.entrySet()) {
+ columns.add(columnToTuple(e.getKey().getColumnFamily(), e.getKey().getColumnQualifier(), e.getKey().getColumnVisibility(), e.getKey().getTimestamp(),
+ e.getValue()));
}
- @Override
- public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
-
- Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
- DefaultDataBag columns = (DefaultDataBag)tuple.get(1);
- for(Tuple column : columns)
- {
- Text cf = Utils.objToText(column.get(0));
- Text cq = Utils.objToText(column.get(1));
- Text cv = Utils.objToText(column.get(2));
- Long ts = (Long)column.get(3);
- Value val = new Value(Utils.objToBytes(column.get(4)));
-
- mut.put(cf, cq, new ColumnVisibility(cv), ts, val);
- }
-
- return Collections.singleton(mut);
+ // and wrap it in a tuple
+ Tuple tuple = TupleFactory.getInstance().newTuple(2);
+ tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+ tuple.set(1, new DefaultDataBag(columns));
+
+ return tuple;
+ }
+
+ private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException {
+ Tuple tuple = TupleFactory.getInstance().newTuple(5);
+ tuple.set(0, new DataByteArray(colfam.getBytes()));
+ tuple.set(1, new DataByteArray(colqual.getBytes()));
+ tuple.set(2, new DataByteArray(colvis.getBytes()));
+ tuple.set(3, new Long(ts));
+ tuple.set(4, new DataByteArray(val.get()));
+ return tuple;
+ }
+
+ protected void configureInputFormat(Configuration conf) {
+ AccumuloInputFormat.addIterator(conf, new IteratorSetting(10, WholeRowIterator.class));
+ }
+
+ @Override
+ public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+
+ Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
+ DefaultDataBag columns = (DefaultDataBag) tuple.get(1);
+ for (Tuple column : columns) {
+ Text cf = Utils.objToText(column.get(0));
+ Text cq = Utils.objToText(column.get(1));
+ Text cv = Utils.objToText(column.get(2));
+ Long ts = (Long) column.get(3);
+ Value val = new Value(Utils.objToBytes(column.get(4)));
+
+ mut.put(cf, cq, new ColumnVisibility(cv), ts, val);
}
+
+ return Collections.singleton(mut);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/main/java/org/apache/accumulo/pig/Utils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/Utils.java b/src/main/java/org/apache/accumulo/pig/Utils.java
index 6b7fdf9..4fecd99 100644
--- a/src/main/java/org/apache/accumulo/pig/Utils.java
+++ b/src/main/java/org/apache/accumulo/pig/Utils.java
@@ -20,45 +20,37 @@ import org.apache.hadoop.io.Text;
import org.apache.pig.data.DataByteArray;
public class Utils {
- public static Text objToText(Object o)
- {
- return new Text(objToBytes(o));
+ public static Text objToText(Object o) {
+ return new Text(objToBytes(o));
+ }
+
+ public static byte[] objToBytes(Object o) {
+ if (o == null) {
+ return new byte[0];
}
- public static byte[] objToBytes(Object o)
- {
- if( o == null)
- {
- return new byte[0];
- }
-
- if (o instanceof String) {
- String str = (String) o;
- return str.getBytes();
- }
- else if (o instanceof Long) {
- Long l = (Long) o;
- return l.toString().getBytes();
- }
- else if (o instanceof Integer) {
- Integer l = (Integer) o;
- return l.toString().getBytes();
- }
- else if (o instanceof Boolean) {
- Boolean l = (Boolean) o;
- return l.toString().getBytes();
- }
- else if (o instanceof Float) {
- Float l = (Float) o;
- return l.toString().getBytes();
- }
- else if (o instanceof Double) {
- Double l = (Double) o;
- return l.toString().getBytes();
- }
-
- // TODO: handle DataBag, Map<Object, Object>, and Tuple
-
- return ((DataByteArray)o).get();
+ if (o instanceof String) {
+ String str = (String) o;
+ return str.getBytes();
+ } else if (o instanceof Long) {
+ Long l = (Long) o;
+ return l.toString().getBytes();
+ } else if (o instanceof Integer) {
+ Integer l = (Integer) o;
+ return l.toString().getBytes();
+ } else if (o instanceof Boolean) {
+ Boolean l = (Boolean) o;
+ return l.toString().getBytes();
+ } else if (o instanceof Float) {
+ Float l = (Float) o;
+ return l.toString().getBytes();
+ } else if (o instanceof Double) {
+ Double l = (Double) o;
+ return l.toString().getBytes();
}
+
+ // TODO: handle DataBag, Map<Object, Object>, and Tuple
+
+ return ((DataByteArray) o).get();
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 402c74a..9b9c3c7 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -36,121 +36,116 @@ import org.apache.pig.data.Tuple;
import org.junit.Test;
public class AbstractAccumuloStorageTest {
-
- public Job getExpectedLoadJob(String inst, String zookeepers, String user, String password,String table,
- String start,String end,Authorizations authorizations, List<Pair<Text, Text>> columnFamilyColumnQualifierPairs) throws IOException
- {
- Collection<Range> ranges = new LinkedList<Range>();
- ranges.add(new Range(start, end));
-
- Job expected = new Job();
- Configuration expectedConf = expected.getConfiguration();
- AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
- AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.setRanges(expectedConf, ranges);
- return expected;
- }
-
- public Job getDefaultExpectedLoadJob() throws IOException
- {
- String inst = "myinstance";
- String zookeepers = "127.0.0.1:2181";
- String user = "root";
- String password = "secret";
- String table = "table1";
- String start = "abc";
- String end = "z";
- Authorizations authorizations = new Authorizations("PRIVATE,PUBLIC".split(","));
-
- List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
- columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text("col1"), new Text("cq1")));
- columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text("col2"), new Text("cq2")));
- columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text("col3"), null));
-
- Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
- return expected;
- }
-
- public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password,String table, long maxWriteBufferSize, int writeThreads, int maxWriteLatencyMS) throws IOException
- {
- Job expected = new Job();
- Configuration expectedConf = expected.getConfiguration();
- AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
- AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
- AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
- AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
- AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
-
- return expected;
- }
-
- public Job getDefaultExpectedStoreJob() throws IOException
- {
- String inst = "myinstance";
- String zookeepers = "127.0.0.1:2181";
- String user = "root";
- String password = "secret";
- String table = "table1";
- long maxWriteBufferSize = 1234000;
- int writeThreads = 7;
- int maxWriteLatencyMS = 30000;
-
- Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
- return expected;
- }
-
- public String getDefaultLoadLocation()
- {
- return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2,col3&start=abc&end=z";
- }
-
- public String getDefaultStoreLocation()
- {
- return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&write_buffer_size_bytes=1234000&write_threads=7&write_latency_ms=30000";
- }
-
- public AbstractAccumuloStorage getAbstractAccumuloStorage()
- {
- AbstractAccumuloStorage s = new AbstractAccumuloStorage() {
-
- @Override
- public Collection<Mutation> getMutations(Tuple tuple) {return null;}
-
- @Override
- protected Tuple getTuple(Key key, Value value) throws IOException {return null;}
- };
- return s;
- }
-
-
- @Test
- public void testSetLoadLocation() throws IOException
- {
- AbstractAccumuloStorage s = getAbstractAccumuloStorage();
-
- Job actual = new Job();
- s.setLocation(getDefaultLoadLocation(), actual);
- Configuration actualConf = actual.getConfiguration();
-
- Job expected = getDefaultExpectedLoadJob();
- Configuration expectedConf = expected.getConfiguration();
-
- TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
- }
-
- @Test
- public void testSetStoreLocation() throws IOException
- {
- AbstractAccumuloStorage s = getAbstractAccumuloStorage();
-
- Job actual = new Job();
- s.setStoreLocation(getDefaultStoreLocation(), actual);
- Configuration actualConf = actual.getConfiguration();
-
- Job expected = getDefaultExpectedStoreJob();
- Configuration expectedConf = expected.getConfiguration();
-
- TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
- }
+
+ public Job getExpectedLoadJob(String inst, String zookeepers, String user, String password, String table, String start, String end,
+ Authorizations authorizations, List<Pair<Text,Text>> columnFamilyColumnQualifierPairs) throws IOException {
+ Collection<Range> ranges = new LinkedList<Range>();
+ ranges.add(new Range(start, end));
+
+ Job expected = new Job();
+ Configuration expectedConf = expected.getConfiguration();
+ AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+ AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.setRanges(expectedConf, ranges);
+ return expected;
+ }
+
+ public Job getDefaultExpectedLoadJob() throws IOException {
+ String inst = "myinstance";
+ String zookeepers = "127.0.0.1:2181";
+ String user = "root";
+ String password = "secret";
+ String table = "table1";
+ String start = "abc";
+ String end = "z";
+ Authorizations authorizations = new Authorizations("PRIVATE,PUBLIC".split(","));
+
+ List<Pair<Text,Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
+ columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col1"), new Text("cq1")));
+ columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col2"), new Text("cq2")));
+ columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col3"), null));
+
+ Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
+ return expected;
+ }
+
+ public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
+ int maxWriteLatencyMS) throws IOException {
+ Job expected = new Job();
+ Configuration expectedConf = expected.getConfiguration();
+ AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
+ AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+ AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
+ AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
+ AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
+
+ return expected;
+ }
+
+ public Job getDefaultExpectedStoreJob() throws IOException {
+ String inst = "myinstance";
+ String zookeepers = "127.0.0.1:2181";
+ String user = "root";
+ String password = "secret";
+ String table = "table1";
+ long maxWriteBufferSize = 1234000;
+ int writeThreads = 7;
+ int maxWriteLatencyMS = 30000;
+
+ Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
+ return expected;
+ }
+
+ public String getDefaultLoadLocation() {
+ return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2,col3&start=abc&end=z";
+ }
+
+ public String getDefaultStoreLocation() {
+ return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&write_buffer_size_bytes=1234000&write_threads=7&write_latency_ms=30000";
+ }
+
+ public AbstractAccumuloStorage getAbstractAccumuloStorage() {
+ AbstractAccumuloStorage s = new AbstractAccumuloStorage() {
+
+ @Override
+ public Collection<Mutation> getMutations(Tuple tuple) {
+ return null;
+ }
+
+ @Override
+ protected Tuple getTuple(Key key, Value value) throws IOException {
+ return null;
+ }
+ };
+ return s;
+ }
+
+ @Test
+ public void testSetLoadLocation() throws IOException {
+ AbstractAccumuloStorage s = getAbstractAccumuloStorage();
+
+ Job actual = new Job();
+ s.setLocation(getDefaultLoadLocation(), actual);
+ Configuration actualConf = actual.getConfiguration();
+
+ Job expected = getDefaultExpectedLoadJob();
+ Configuration expectedConf = expected.getConfiguration();
+
+ TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
+ }
+
+ @Test
+ public void testSetStoreLocation() throws IOException {
+ AbstractAccumuloStorage s = getAbstractAccumuloStorage();
+
+ Job actual = new Job();
+ s.setStoreLocation(getDefaultStoreLocation(), actual);
+ Configuration actualConf = actual.getConfiguration();
+
+ Job expected = getDefaultExpectedStoreJob();
+ Configuration expectedConf = expected.getConfiguration();
+
+ TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index dd45a1a..fbd68c6 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -33,75 +33,72 @@ import org.apache.pig.data.TupleFactory;
import org.junit.Test;
public class AccumuloStorageTest {
-
- @Test
- public void testGetMutations4() throws Exception
- {
- AccumuloStorage s = new AccumuloStorage();
-
- Tuple tuple = TupleFactory.getInstance().newTuple(4);
- tuple.set(0, "row1");
- tuple.set(1, "cf1");
- tuple.set(2, "cq1");
- tuple.set(3, "val1");
-
- Collection<Mutation> muts = s.getMutations(tuple);
-
- assertNotNull(muts);
- assertEquals(1, muts.size());
- Mutation mut = muts.iterator().next();
- List<ColumnUpdate> updates = mut.getUpdates();
- assertEquals(1, updates.size());
- ColumnUpdate update = updates.get(0);
-
- assertTrue(Arrays.equals(((String)tuple.get(0)).getBytes(), mut.getRow()));
- assertTrue(Arrays.equals(((String)tuple.get(1)).getBytes(), update.getColumnFamily()));
- assertTrue(Arrays.equals(((String)tuple.get(2)).getBytes(), update.getColumnQualifier()));
- assertTrue(Arrays.equals(((String)tuple.get(3)).getBytes(), update.getValue()));
- assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
- }
-
- @Test
- public void testGetMutations5() throws Exception
- {
- AccumuloStorage s = new AccumuloStorage();
-
- Tuple tuple = TupleFactory.getInstance().newTuple(5);
- tuple.set(0, "row1");
- tuple.set(1, "cf1");
- tuple.set(2, "cq1");
- tuple.set(3, "cv1");
- tuple.set(4, "val1");
-
- Collection<Mutation> muts = s.getMutations(tuple);
-
- assertNotNull(muts);
- assertEquals(1, muts.size());
- Mutation mut = muts.iterator().next();
- List<ColumnUpdate> updates = mut.getUpdates();
- assertEquals(1, updates.size());
- ColumnUpdate update = updates.get(0);
-
- assertTrue(Arrays.equals(((String)tuple.get(0)).getBytes(), mut.getRow()));
- assertTrue(Arrays.equals(((String)tuple.get(1)).getBytes(), update.getColumnFamily()));
- assertTrue(Arrays.equals(((String)tuple.get(2)).getBytes(), update.getColumnQualifier()));
- assertTrue(Arrays.equals(((String)tuple.get(3)).getBytes(), update.getColumnVisibility()));
- assertTrue(Arrays.equals(((String)tuple.get(4)).getBytes(), update.getValue()));
- }
-
- @Test
- public void testGetTuple() throws Exception
- {
- AccumuloStorage s = new AccumuloStorage();
-
- Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
- Value value = new Value("val1".getBytes());
- Tuple tuple = s.getTuple(key, value);
- TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
-
- key = new Key("row1", "cf1", "cq1");
- value = new Value("val1".getBytes());
- tuple = s.getTuple(key, value);
- TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
- }
+
+ @Test
+ public void testGetMutations4() throws Exception {
+ AccumuloStorage s = new AccumuloStorage();
+
+ Tuple tuple = TupleFactory.getInstance().newTuple(4);
+ tuple.set(0, "row1");
+ tuple.set(1, "cf1");
+ tuple.set(2, "cq1");
+ tuple.set(3, "val1");
+
+ Collection<Mutation> muts = s.getMutations(tuple);
+
+ assertNotNull(muts);
+ assertEquals(1, muts.size());
+ Mutation mut = muts.iterator().next();
+ List<ColumnUpdate> updates = mut.getUpdates();
+ assertEquals(1, updates.size());
+ ColumnUpdate update = updates.get(0);
+
+ assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
+ assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
+ assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
+ assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getValue()));
+ assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
+ }
+
+ @Test
+ public void testGetMutations5() throws Exception {
+ AccumuloStorage s = new AccumuloStorage();
+
+ Tuple tuple = TupleFactory.getInstance().newTuple(5);
+ tuple.set(0, "row1");
+ tuple.set(1, "cf1");
+ tuple.set(2, "cq1");
+ tuple.set(3, "cv1");
+ tuple.set(4, "val1");
+
+ Collection<Mutation> muts = s.getMutations(tuple);
+
+ assertNotNull(muts);
+ assertEquals(1, muts.size());
+ Mutation mut = muts.iterator().next();
+ List<ColumnUpdate> updates = mut.getUpdates();
+ assertEquals(1, updates.size());
+ ColumnUpdate update = updates.get(0);
+
+ assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
+ assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
+ assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
+ assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility()));
+ assertTrue(Arrays.equals(((String) tuple.get(4)).getBytes(), update.getValue()));
+ }
+
+ @Test
+ public void testGetTuple() throws Exception {
+ AccumuloStorage s = new AccumuloStorage();
+
+ Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
+ Value value = new Value("val1".getBytes());
+ Tuple tuple = s.getTuple(key, value);
+ TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+
+ key = new Key("row1", "cf1", "cq1");
+ value = new Value("val1".getBytes());
+ tuple = s.getTuple(key, value);
+ TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index 750ebea..f8e8fe1 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -45,120 +45,108 @@ import org.apache.pig.data.TupleFactory;
import org.junit.Test;
public class AccumuloWholeRowStorageTest {
-
- @Test
- public void testConfiguration() throws IOException
- {
- AbstractAccumuloStorageTest test = new AbstractAccumuloStorageTest();
-
- AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
-
- Job actual = new Job();
- s.setLocation(test.getDefaultLoadLocation(), actual);
- Configuration actualConf = actual.getConfiguration();
-
- Job expected = test.getDefaultExpectedLoadJob();
- Configuration expectedConf = expected.getConfiguration();
- AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(10, WholeRowIterator.class));
-
- TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
- }
-
- public static Tuple generateTuple(String cf, String cq, String cv, Long ts, String val) throws ExecException
- {
- Tuple tuple = TupleFactory.getInstance().newTuple(5);
- tuple.set(0, new DataByteArray(cf.getBytes()));
- tuple.set(1, new DataByteArray(cq.getBytes()));
- tuple.set(2, new DataByteArray(cv.getBytes()));
- tuple.set(3, ts);
- tuple.set(4, new DataByteArray(val.getBytes()));
- return tuple;
- }
-
- @Test
- public void testGetMutations() throws Exception
- {
- Tuple tuple = TupleFactory.getInstance().newTuple(2);
- tuple.set(0, "row1");
-
- DefaultDataBag bag = new DefaultDataBag();
- bag.add(generateTuple("cf1", "cq1", "cv1", 1L, "val1"));
- bag.add(generateTuple("cf2", "cq2", "cv2", 2L, "val2"));
- bag.add(generateTuple("cf3", "cq3", "cv3", 3L, "val3"));
- tuple.set(1, bag);
-
- AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
- Collection<Mutation> muts = s.getMutations(tuple);
-
- assertNotNull(muts);
- assertEquals(1, muts.size());
- Mutation mut = muts.iterator().next();
-
- List<ColumnUpdate> updates = mut.getUpdates();
- assertEquals(3, updates.size());
-
- assertTrue(Arrays.equals(((String)tuple.get(0)).getBytes(), mut.getRow()));
-
- Iterator<Tuple> iter = bag.iterator();
- for(ColumnUpdate update : updates)
- {
- Tuple colTuple = iter.next();
-
- assertTrue(Arrays.equals(((DataByteArray)colTuple.get(0)).get(), update.getColumnFamily()));
- assertTrue(Arrays.equals(((DataByteArray)colTuple.get(1)).get(), update.getColumnQualifier()));
- assertTrue(Arrays.equals(((DataByteArray)colTuple.get(2)).get(), update.getColumnVisibility()));
- assertEquals(((Long)colTuple.get(3)).longValue(), update.getTimestamp());
- assertTrue(Arrays.equals(((DataByteArray)colTuple.get(4)).get(), update.getValue()));
- }
- }
-
- @Test
- public void testGetTuple() throws Exception
- {
- AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
-
- Key key = new Key("row");
-
- List<Key> keys = new ArrayList<Key>(3);
- keys.add(new Key("row", "cf1", "cf1", "cv1", 1L));
- keys.add(new Key("row", "cf2", "cf2", "cv2", 2L));
- keys.add(new Key("row", "cf3", "cf3", "cv3", 3L));
-
- List<Value> values = new ArrayList<Value>(3);
- values.add(new Value("1".getBytes()));
- values.add(new Value("2".getBytes()));
- values.add(new Value("3".getBytes()));
-
- Value value = WholeRowIterator.encodeRow(keys, values);
-
- List<Tuple> columns = new LinkedList<Tuple>();
- for(int i = 0; i < keys.size(); ++i)
- {
- columns.add(columnToTuple(
- keys.get(i).getColumnFamily().toString(),
- keys.get(i).getColumnQualifier().toString(),
- keys.get(i).getColumnVisibility().toString(),
- keys.get(i).getTimestamp(),
- new String(values.get(i).get())));
- }
-
- Tuple tuple = TupleFactory.getInstance().newTuple(2);
- tuple.set(0, new DataByteArray(key.getRow().getBytes()));
- tuple.set(1, new DefaultDataBag(columns));
-
- TestUtils.assertWholeRowKeyValueEqualsTuple(key, value, tuple);
- }
-
- private Tuple columnToTuple(String colfam, String colqual, String colvis, long ts, String val) throws IOException
- {
- Tuple tuple = TupleFactory.getInstance().newTuple(5);
- tuple.set(0, new DataByteArray(colfam.getBytes()));
- tuple.set(1, new DataByteArray(colqual.getBytes()));
- tuple.set(2, new DataByteArray(colvis.getBytes()));
- tuple.set(3, new Long(ts));
- tuple.set(4, new DataByteArray(val.getBytes()));
- return tuple;
+
+ @Test
+ public void testConfiguration() throws IOException {
+ AbstractAccumuloStorageTest test = new AbstractAccumuloStorageTest();
+
+ AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
+
+ Job actual = new Job();
+ s.setLocation(test.getDefaultLoadLocation(), actual);
+ Configuration actualConf = actual.getConfiguration();
+
+ Job expected = test.getDefaultExpectedLoadJob();
+ Configuration expectedConf = expected.getConfiguration();
+ AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(10, WholeRowIterator.class));
+
+ TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
+ }
+
+ public static Tuple generateTuple(String cf, String cq, String cv, Long ts, String val) throws ExecException {
+ Tuple tuple = TupleFactory.getInstance().newTuple(5);
+ tuple.set(0, new DataByteArray(cf.getBytes()));
+ tuple.set(1, new DataByteArray(cq.getBytes()));
+ tuple.set(2, new DataByteArray(cv.getBytes()));
+ tuple.set(3, ts);
+ tuple.set(4, new DataByteArray(val.getBytes()));
+ return tuple;
+ }
+
+ @Test
+ public void testGetMutations() throws Exception {
+ Tuple tuple = TupleFactory.getInstance().newTuple(2);
+ tuple.set(0, "row1");
+
+ DefaultDataBag bag = new DefaultDataBag();
+ bag.add(generateTuple("cf1", "cq1", "cv1", 1L, "val1"));
+ bag.add(generateTuple("cf2", "cq2", "cv2", 2L, "val2"));
+ bag.add(generateTuple("cf3", "cq3", "cv3", 3L, "val3"));
+ tuple.set(1, bag);
+
+ AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
+ Collection<Mutation> muts = s.getMutations(tuple);
+
+ assertNotNull(muts);
+ assertEquals(1, muts.size());
+ Mutation mut = muts.iterator().next();
+
+ List<ColumnUpdate> updates = mut.getUpdates();
+ assertEquals(3, updates.size());
+
+ assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
+
+ Iterator<Tuple> iter = bag.iterator();
+ for (ColumnUpdate update : updates) {
+ Tuple colTuple = iter.next();
+
+ assertTrue(Arrays.equals(((DataByteArray) colTuple.get(0)).get(), update.getColumnFamily()));
+ assertTrue(Arrays.equals(((DataByteArray) colTuple.get(1)).get(), update.getColumnQualifier()));
+ assertTrue(Arrays.equals(((DataByteArray) colTuple.get(2)).get(), update.getColumnVisibility()));
+ assertEquals(((Long) colTuple.get(3)).longValue(), update.getTimestamp());
+ assertTrue(Arrays.equals(((DataByteArray) colTuple.get(4)).get(), update.getValue()));
}
-
-
+ }
+
+ @Test
+ public void testGetTuple() throws Exception {
+ AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
+
+ Key key = new Key("row");
+
+ List<Key> keys = new ArrayList<Key>(3);
+ keys.add(new Key("row", "cf1", "cf1", "cv1", 1L));
+ keys.add(new Key("row", "cf2", "cf2", "cv2", 2L));
+ keys.add(new Key("row", "cf3", "cf3", "cv3", 3L));
+
+ List<Value> values = new ArrayList<Value>(3);
+ values.add(new Value("1".getBytes()));
+ values.add(new Value("2".getBytes()));
+ values.add(new Value("3".getBytes()));
+
+ Value value = WholeRowIterator.encodeRow(keys, values);
+
+ List<Tuple> columns = new LinkedList<Tuple>();
+ for (int i = 0; i < keys.size(); ++i) {
+ columns.add(columnToTuple(keys.get(i).getColumnFamily().toString(), keys.get(i).getColumnQualifier().toString(), keys.get(i).getColumnVisibility()
+ .toString(), keys.get(i).getTimestamp(), new String(values.get(i).get())));
+ }
+
+ Tuple tuple = TupleFactory.getInstance().newTuple(2);
+ tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+ tuple.set(1, new DefaultDataBag(columns));
+
+ TestUtils.assertWholeRowKeyValueEqualsTuple(key, value, tuple);
+ }
+
+ private Tuple columnToTuple(String colfam, String colqual, String colvis, long ts, String val) throws IOException {
+ Tuple tuple = TupleFactory.getInstance().newTuple(5);
+ tuple.set(0, new DataByteArray(colfam.getBytes()));
+ tuple.set(1, new DataByteArray(colqual.getBytes()));
+ tuple.set(2, new DataByteArray(colvis.getBytes()));
+ tuple.set(3, new Long(ts));
+ tuple.set(4, new DataByteArray(val.getBytes()));
+ return tuple;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/test/java/org/apache/accumulo/pig/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/TestUtils.java b/src/test/java/org/apache/accumulo/pig/TestUtils.java
index 5a9019b..6c8bebf 100644
--- a/src/test/java/org/apache/accumulo/pig/TestUtils.java
+++ b/src/test/java/org/apache/accumulo/pig/TestUtils.java
@@ -34,51 +34,45 @@ import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
public class TestUtils {
- public static void assertConfigurationsEqual(Configuration expectedConf, Configuration actualConf)
- {
- // Basically, for all the keys in expectedConf, make sure the values in both confs are equal
- Iterator<Entry<String, String>> expectedIter = expectedConf.iterator();
- while(expectedIter.hasNext())
- {
- Entry<String, String> e = expectedIter.next();
- assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
- }
-
- // Basically, for all the keys in actualConf, make sure the values in both confs are equal
- Iterator<Entry<String, String>> actualIter = actualConf.iterator();
- while(actualIter.hasNext())
- {
- Entry<String, String> e = actualIter.next();
- assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
- }
- }
-
- public static void assertKeyValueEqualsTuple(Key key, Value value, Tuple tuple) throws ExecException
- {
- assertTrue(Arrays.equals(key.getRow().getBytes(), ((DataByteArray)tuple.get(0)).get()));
- assertTrue(Arrays.equals(key.getColumnFamily().getBytes(), ((DataByteArray)tuple.get(1)).get()));
- assertTrue(Arrays.equals(key.getColumnQualifier().getBytes(), ((DataByteArray)tuple.get(2)).get()));
- assertTrue(Arrays.equals(key.getColumnVisibility().getBytes(), ((DataByteArray)tuple.get(3)).get()));
- assertEquals(key.getTimestamp(), ((Long)tuple.get(4)).longValue());
- assertTrue(Arrays.equals(value.get(), ((DataByteArray)tuple.get(5)).get()));
- }
-
- public static void assertWholeRowKeyValueEqualsTuple(Key key, Value value, Tuple mainTuple) throws IOException
- {
- assertTrue(Arrays.equals(key.getRow().getBytes(), ((DataByteArray)mainTuple.get(0)).get()));
-
- DefaultDataBag bag = (DefaultDataBag)mainTuple.get(1);
- Iterator<Tuple> iter = bag.iterator();
-
- for(Entry<Key, Value> e : WholeRowIterator.decodeRow(key, value).entrySet())
- {
- Tuple tuple = iter.next();
-
- assertTrue(Arrays.equals(e.getKey().getColumnFamily().getBytes(), ((DataByteArray)tuple.get(0)).get()));
- assertTrue(Arrays.equals(e.getKey().getColumnQualifier().getBytes(), ((DataByteArray)tuple.get(1)).get()));
- assertTrue(Arrays.equals(e.getKey().getColumnVisibility().getBytes(), ((DataByteArray)tuple.get(2)).get()));
- assertEquals(e.getKey().getTimestamp(), ((Long)tuple.get(3)).longValue());
- assertTrue(Arrays.equals(e.getValue().get(), ((DataByteArray)tuple.get(4)).get()));
- }
- }
+ public static void assertConfigurationsEqual(Configuration expectedConf, Configuration actualConf) {
+ // Basically, for all the keys in expectedConf, make sure the values in both confs are equal
+ Iterator<Entry<String,String>> expectedIter = expectedConf.iterator();
+ while (expectedIter.hasNext()) {
+ Entry<String,String> e = expectedIter.next();
+ assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+ }
+
+ // Basically, for all the keys in actualConf, make sure the values in both confs are equal
+ Iterator<Entry<String,String>> actualIter = actualConf.iterator();
+ while (actualIter.hasNext()) {
+ Entry<String,String> e = actualIter.next();
+ assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+ }
+ }
+
+ public static void assertKeyValueEqualsTuple(Key key, Value value, Tuple tuple) throws ExecException {
+ assertTrue(Arrays.equals(key.getRow().getBytes(), ((DataByteArray) tuple.get(0)).get()));
+ assertTrue(Arrays.equals(key.getColumnFamily().getBytes(), ((DataByteArray) tuple.get(1)).get()));
+ assertTrue(Arrays.equals(key.getColumnQualifier().getBytes(), ((DataByteArray) tuple.get(2)).get()));
+ assertTrue(Arrays.equals(key.getColumnVisibility().getBytes(), ((DataByteArray) tuple.get(3)).get()));
+ assertEquals(key.getTimestamp(), ((Long) tuple.get(4)).longValue());
+ assertTrue(Arrays.equals(value.get(), ((DataByteArray) tuple.get(5)).get()));
+ }
+
+ public static void assertWholeRowKeyValueEqualsTuple(Key key, Value value, Tuple mainTuple) throws IOException {
+ assertTrue(Arrays.equals(key.getRow().getBytes(), ((DataByteArray) mainTuple.get(0)).get()));
+
+ DefaultDataBag bag = (DefaultDataBag) mainTuple.get(1);
+ Iterator<Tuple> iter = bag.iterator();
+
+ for (Entry<Key,Value> e : WholeRowIterator.decodeRow(key, value).entrySet()) {
+ Tuple tuple = iter.next();
+
+ assertTrue(Arrays.equals(e.getKey().getColumnFamily().getBytes(), ((DataByteArray) tuple.get(0)).get()));
+ assertTrue(Arrays.equals(e.getKey().getColumnQualifier().getBytes(), ((DataByteArray) tuple.get(1)).get()));
+ assertTrue(Arrays.equals(e.getKey().getColumnVisibility().getBytes(), ((DataByteArray) tuple.get(2)).get()));
+ assertEquals(e.getKey().getTimestamp(), ((Long) tuple.get(3)).longValue());
+ assertTrue(Arrays.equals(e.getValue().get(), ((DataByteArray) tuple.get(4)).get()));
+ }
+ }
}