You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/04 21:37:49 UTC
svn commit: r1564474 - in /pig/trunk: CHANGES.txt
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java
Author: rohini
Date: Tue Feb 4 20:37:49 2014
New Revision: 1564474
URL: http://svn.apache.org/r1564474
Log:
PIG-3744: SequenceFileLoader does not support BytesWritable (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1564474&r1=1564473&r2=1564474&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Feb 4 20:37:49 2014
@@ -87,6 +87,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3744: SequenceFileLoader does not support BytesWritable (rohini)
+
PIG-3726: Ranking empty records leads to NullPointerException (jarcec via daijy)
PIG-3652: Pigmix parser (PigPerformanceLoader) deletes chars during parsing (keren3000 via daijy)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java?rev=1564474&r1=1564473&r2=1564474&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java Tue Feb 4 20:37:49 2014
@@ -21,12 +21,11 @@ import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
-import org.joda.time.DateTime;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
@@ -56,42 +55,42 @@ import org.apache.pig.data.TupleFactory;
**/
public class SequenceFileLoader extends FileInputLoadFunc {
-
+
private SequenceFileRecordReader<Writable, Writable> reader;
-
+
private Writable key;
private Writable value;
private ArrayList<Object> mProtoTuple = null;
-
+
protected static final Log LOG = LogFactory.getLog(SequenceFileLoader.class);
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
protected SerializationFactory serializationFactory;
protected byte keyType = DataType.UNKNOWN;
protected byte valType = DataType.UNKNOWN;
-
+
public SequenceFileLoader() {
mProtoTuple = new ArrayList<Object>(2);
}
-
+
protected void setKeyType(Class<?> keyClass) throws BackendException {
this.keyType |= inferPigDataType(keyClass);
- if (keyType == DataType.ERROR) {
+ if (keyType == DataType.ERROR) {
LOG.warn("Unable to translate key "+key.getClass()+" to a Pig datatype");
throw new BackendException("Unable to translate "+key.getClass()+" to a Pig datatype");
- }
+ }
}
-
+
protected void setValueType(Class<?> valueClass) throws BackendException {
this.valType |= inferPigDataType(valueClass);
- if (keyType == DataType.ERROR) {
+ if (keyType == DataType.ERROR) {
LOG.warn("Unable to translate key "+key.getClass()+" to a Pig datatype");
throw new BackendException("Unable to translate "+key.getClass()+" to a Pig datatype");
- }
+ }
}
-
+
protected byte inferPigDataType(Type t) {
- if (t == DataByteArray.class) return DataType.BYTEARRAY;
+ if (t == BytesWritable.class) return DataType.BYTEARRAY;
else if (t == Text.class) return DataType.CHARARRAY;
else if (t == IntWritable.class) return DataType.INTEGER;
else if (t == LongWritable.class) return DataType.LONG;
@@ -103,11 +102,14 @@ public class SequenceFileLoader extends
// not doing maps or other complex types for now
else return DataType.ERROR;
}
-
+
protected Object translateWritableToPigDataType(Writable w, byte dataType) {
switch(dataType) {
case DataType.CHARARRAY: return ((Text) w).toString();
- case DataType.BYTEARRAY: return((DataByteArray) w).get();
+ case DataType.BYTEARRAY:
+ BytesWritable bw = (BytesWritable) w;
+ // Make a copy
+ return new DataByteArray(bw.getBytes(), 0, bw.getLength());
case DataType.BOOLEAN: return ((BooleanWritable) w).get();
case DataType.INTEGER: return ((IntWritable) w).get();
case DataType.LONG: return ((LongWritable) w).get();
@@ -116,10 +118,10 @@ public class SequenceFileLoader extends
case DataType.BYTE: return ((ByteWritable) w).get();
case DataType.DATETIME: return ((DateTimeWritable) w).get();
}
-
+
return null;
}
-
+
@Override
public Tuple getNext() throws IOException {
boolean next = false;
@@ -128,19 +130,19 @@ public class SequenceFileLoader extends
} catch (InterruptedException e) {
throw new IOException(e);
}
-
+
if (!next) return null;
-
+
key = reader.getCurrentKey();
value = reader.getCurrentValue();
-
+
if (keyType == DataType.UNKNOWN && key != null) {
setKeyType(key.getClass());
}
if (valType == DataType.UNKNOWN && value != null) {
setValueType(value.getClass());
}
-
+
mProtoTuple.add(translateWritableToPigDataType(key, keyType));
mProtoTuple.add(translateWritableToPigDataType(value, valType));
Tuple t = mTupleFactory.newTuple(mProtoTuple);
@@ -163,6 +165,6 @@ public class SequenceFileLoader extends
@Override
public void setLocation(String location, Job job) throws IOException {
- FileInputFormat.setInputPaths(job, location);
+ FileInputFormat.setInputPaths(job, location);
}
}
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java?rev=1564474&r1=1564473&r2=1564474&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java Tue Feb 4 20:37:49 2014
@@ -18,21 +18,27 @@
package org.apache.pig.piggybank.test.storage;
import static org.apache.pig.ExecType.LOCAL;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import junit.framework.TestCase;
+
import org.junit.Test;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
//import org.apache.pig.test.PigExecTestCase;
import org.apache.pig.test.Util;
@@ -43,15 +49,15 @@ import org.apache.pig.test.Util;
"one, two, buckle my shoe",
"three, four, shut the door",
"five, six, something else" };
-
+
private static final String[][] EXPECTED = {
{"0", "one, two, buckle my shoe"},
{"1", "three, four, shut the door"},
{"2", "five, six, something else"}
};
-
+
private String tmpFileName;
-
+
private PigServer pigServer;
@Override
public void setUp() throws Exception {
@@ -62,12 +68,12 @@ import org.apache.pig.test.Util;
Path path = new Path("file:///"+tmpFileName);
JobConf conf = new JobConf();
FileSystem fs = FileSystem.get(path.toUri(), conf);
-
+
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
- writer = SequenceFile.createWriter(fs, conf, path,
+ writer = SequenceFile.createWriter(fs, conf, path,
key.getClass(), value.getClass());
for (int i=0; i < DATA.length; i++) {
key.set(i);
@@ -78,10 +84,10 @@ import org.apache.pig.test.Util;
IOUtils.closeStream(writer);
}
}
-
+
@Test
public void testReadsNocast() throws IOException {
- pigServer.registerQuery("A = LOAD '" + Util.encodeEscape(tmpFileName) +
+ pigServer.registerQuery("A = LOAD '" + Util.encodeEscape(tmpFileName) +
"' USING org.apache.pig.piggybank.storage.SequenceFileLoader() AS (key, val);");
Iterator<?> it = pigServer.openIterator("A");
int tupleCount = 0;
@@ -98,10 +104,10 @@ import org.apache.pig.test.Util;
}
assertEquals(DATA.length, tupleCount);
}
-
+
@Test
public void testReadsStringCast() throws IOException {
- pigServer.registerQuery("A = LOAD '" + Util.encodeEscape(tmpFileName) +
+ pigServer.registerQuery("A = LOAD '" + Util.encodeEscape(tmpFileName) +
"' USING org.apache.pig.piggybank.storage.SequenceFileLoader() AS (key:long, val);");
Iterator<?> it = pigServer.openIterator("A");
int tupleCount = 0;
@@ -117,4 +123,41 @@ import org.apache.pig.test.Util;
}
assertEquals(DATA.length, tupleCount);
}
+
+ @Test
+ public void testReadBytesWritable() throws IOException {
+ File inputFile = File.createTempFile("test", ".txt");
+ System.err.println("fileName: " + inputFile.getAbsolutePath());
+ Path path = new Path("file:///" + inputFile.getAbsolutePath());
+ JobConf conf = new JobConf();
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+
+ IntWritable key = new IntWritable();
+ SequenceFile.Writer writer = null;
+ try {
+ writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), BytesWritable.class);
+ int numRecords = 3;
+ for (int i = 0; i < numRecords; i++) {
+ key.set(i);
+ String val = "" + Math.pow(10, (numRecords - i));
+ writer.append(key, new BytesWritable(val.getBytes()));
+ }
+ } finally {
+ IOUtils.closeStream(writer);
+ }
+
+ Data data = resetData(pigServer);
+ data.set("expected",
+ tuple(0L, new DataByteArray("1000.0")),
+ tuple(1L, new DataByteArray("100.0")),
+ tuple(2L, new DataByteArray("10.0")));
+
+ pigServer.registerQuery(
+ "A = LOAD '" + Util.encodeEscape(inputFile.getAbsolutePath()) +
+ "' USING org.apache.pig.piggybank.storage.SequenceFileLoader() AS (key:long, val);");
+ pigServer.registerQuery("STORE A into 'actual' USING mock.Storage();");
+
+ assertEquals(data.get("expected"), data.get("actual"));
+
+ }
}