You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/04 21:37:49 UTC

svn commit: r1564474 - in /pig/trunk: CHANGES.txt contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java

Author: rohini
Date: Tue Feb  4 20:37:49 2014
New Revision: 1564474

URL: http://svn.apache.org/r1564474
Log:
PIG-3744: SequenceFileLoader does not support BytesWritable (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1564474&r1=1564473&r2=1564474&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Feb  4 20:37:49 2014
@@ -87,6 +87,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3744: SequenceFileLoader does not support BytesWritable (rohini)
+
 PIG-3726: Ranking empty records leads to NullPointerException (jarcec via daijy)
 
 PIG-3652: Pigmix parser (PigPerformanceLoader) deletes chars during parsing (keren3000 via daijy)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java?rev=1564474&r1=1564473&r2=1564474&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java Tue Feb  4 20:37:49 2014
@@ -21,12 +21,11 @@ import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 
-import org.joda.time.DateTime;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -56,42 +55,42 @@ import org.apache.pig.data.TupleFactory;
  **/
 
 public class SequenceFileLoader extends FileInputLoadFunc {
-  
+
   private SequenceFileRecordReader<Writable, Writable> reader;
- 
+
   private Writable key;
   private Writable value;
   private ArrayList<Object> mProtoTuple = null;
-  
+
   protected static final Log LOG = LogFactory.getLog(SequenceFileLoader.class);
   protected TupleFactory mTupleFactory = TupleFactory.getInstance();
   protected SerializationFactory serializationFactory;
 
   protected byte keyType = DataType.UNKNOWN;
   protected byte valType = DataType.UNKNOWN;
-    
+
   public SequenceFileLoader() {
     mProtoTuple = new ArrayList<Object>(2);
   }
- 
+
   protected void setKeyType(Class<?> keyClass) throws BackendException {
     this.keyType |= inferPigDataType(keyClass);
-    if (keyType == DataType.ERROR) { 
+    if (keyType == DataType.ERROR) {
       LOG.warn("Unable to translate key "+key.getClass()+" to a Pig datatype");
       throw new BackendException("Unable to translate "+key.getClass()+" to a Pig datatype");
-    } 
+    }
   }
-  
+
   protected void setValueType(Class<?> valueClass) throws BackendException {
     this.valType |= inferPigDataType(valueClass);
-    if (keyType == DataType.ERROR) { 
+    if (keyType == DataType.ERROR) {
       LOG.warn("Unable to translate key "+key.getClass()+" to a Pig datatype");
       throw new BackendException("Unable to translate "+key.getClass()+" to a Pig datatype");
-    } 
+    }
   }
-    
+
   protected byte inferPigDataType(Type t) {
-    if (t == DataByteArray.class) return DataType.BYTEARRAY;
+    if (t == BytesWritable.class) return DataType.BYTEARRAY;
     else if (t == Text.class) return DataType.CHARARRAY;
     else if (t == IntWritable.class) return DataType.INTEGER;
     else if (t == LongWritable.class) return DataType.LONG;
@@ -103,11 +102,14 @@ public class SequenceFileLoader extends 
     // not doing maps or other complex types for now
     else return DataType.ERROR;
   }
-  
+
   protected Object translateWritableToPigDataType(Writable w, byte dataType) {
     switch(dataType) {
       case DataType.CHARARRAY: return ((Text) w).toString();
-      case DataType.BYTEARRAY: return((DataByteArray) w).get();
+      case DataType.BYTEARRAY:
+            BytesWritable bw = (BytesWritable) w;
+            // Make a copy
+            return new DataByteArray(bw.getBytes(), 0, bw.getLength());
       case DataType.BOOLEAN: return ((BooleanWritable) w).get();
       case DataType.INTEGER: return ((IntWritable) w).get();
       case DataType.LONG: return ((LongWritable) w).get();
@@ -116,10 +118,10 @@ public class SequenceFileLoader extends 
       case DataType.BYTE: return ((ByteWritable) w).get();
       case DataType.DATETIME: return ((DateTimeWritable) w).get();
     }
-    
+
     return null;
   }
-  
+
   @Override
   public Tuple getNext() throws IOException {
     boolean next = false;
@@ -128,19 +130,19 @@ public class SequenceFileLoader extends 
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
-    
+
     if (!next) return null;
-    
+
     key = reader.getCurrentKey();
     value = reader.getCurrentValue();
-    
+
     if (keyType == DataType.UNKNOWN && key != null) {
         setKeyType(key.getClass());
     }
     if (valType == DataType.UNKNOWN && value != null) {
         setValueType(value.getClass());
     }
-    
+
     mProtoTuple.add(translateWritableToPigDataType(key, keyType));
     mProtoTuple.add(translateWritableToPigDataType(value, valType));
     Tuple t =  mTupleFactory.newTuple(mProtoTuple);
@@ -163,6 +165,6 @@ public class SequenceFileLoader extends 
 
   @Override
   public void setLocation(String location, Job job) throws IOException {
-    FileInputFormat.setInputPaths(job, location);    
+    FileInputFormat.setInputPaths(job, location);
   }
 }

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java?rev=1564474&r1=1564473&r2=1564474&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java Tue Feb  4 20:37:49 2014
@@ -18,21 +18,27 @@
 package org.apache.pig.piggybank.test.storage;
 
 import static org.apache.pig.ExecType.LOCAL;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
 import junit.framework.TestCase;
+
 import org.junit.Test;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 //import org.apache.pig.test.PigExecTestCase;
 import org.apache.pig.test.Util;
@@ -43,15 +49,15 @@ import org.apache.pig.test.Util;
     "one, two, buckle my shoe",
     "three, four, shut the door",
     "five, six, something else" };
-  
+
   private static final String[][] EXPECTED = {
     {"0", "one, two, buckle my shoe"},
     {"1", "three, four, shut the door"},
     {"2", "five, six, something else"}
   };
-  
+
   private String tmpFileName;
-  
+
   private PigServer pigServer;
   @Override
   public void setUp() throws Exception {
@@ -62,12 +68,12 @@ import org.apache.pig.test.Util;
     Path path = new Path("file:///"+tmpFileName);
     JobConf conf = new JobConf();
     FileSystem fs = FileSystem.get(path.toUri(), conf);
-    
+
     IntWritable key = new IntWritable();
     Text value = new Text();
     SequenceFile.Writer writer = null;
     try {
-      writer = SequenceFile.createWriter(fs, conf, path, 
+      writer = SequenceFile.createWriter(fs, conf, path,
                                          key.getClass(), value.getClass());
       for (int i=0; i < DATA.length; i++) {
         key.set(i);
@@ -78,10 +84,10 @@ import org.apache.pig.test.Util;
       IOUtils.closeStream(writer);
     }
   }
-  
+
   @Test
   public void testReadsNocast() throws IOException {
-    pigServer.registerQuery("A = LOAD '" + Util.encodeEscape(tmpFileName) + 
+    pigServer.registerQuery("A = LOAD '" + Util.encodeEscape(tmpFileName) +
     "' USING org.apache.pig.piggybank.storage.SequenceFileLoader() AS (key, val);");
     Iterator<?> it = pigServer.openIterator("A");
     int tupleCount = 0;
@@ -98,10 +104,10 @@ import org.apache.pig.test.Util;
     }
     assertEquals(DATA.length, tupleCount);
   }
-  
+
   @Test
   public void testReadsStringCast() throws IOException {
-    pigServer.registerQuery("A = LOAD '" + Util.encodeEscape(tmpFileName) + 
+    pigServer.registerQuery("A = LOAD '" + Util.encodeEscape(tmpFileName) +
     "' USING org.apache.pig.piggybank.storage.SequenceFileLoader() AS (key:long, val);");
     Iterator<?> it = pigServer.openIterator("A");
     int tupleCount = 0;
@@ -117,4 +123,41 @@ import org.apache.pig.test.Util;
     }
     assertEquals(DATA.length, tupleCount);
   }
+
+    @Test
+    public void testReadBytesWritable() throws IOException {
+        File inputFile = File.createTempFile("test", ".txt");
+        System.err.println("fileName: " + inputFile.getAbsolutePath());
+        Path path = new Path("file:///" + inputFile.getAbsolutePath());
+        JobConf conf = new JobConf();
+        FileSystem fs = FileSystem.get(path.toUri(), conf);
+
+        IntWritable key = new IntWritable();
+        SequenceFile.Writer writer = null;
+        try {
+            writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), BytesWritable.class);
+            int numRecords = 3;
+            for (int i = 0; i < numRecords; i++) {
+                key.set(i);
+                String val = "" + Math.pow(10, (numRecords - i));
+                writer.append(key, new BytesWritable(val.getBytes()));
+            }
+        } finally {
+            IOUtils.closeStream(writer);
+        }
+
+        Data data = resetData(pigServer);
+        data.set("expected",
+                tuple(0L, new DataByteArray("1000.0")),
+                tuple(1L, new DataByteArray("100.0")),
+                tuple(2L, new DataByteArray("10.0")));
+
+        pigServer.registerQuery(
+                "A = LOAD '" + Util.encodeEscape(inputFile.getAbsolutePath()) +
+                "' USING org.apache.pig.piggybank.storage.SequenceFileLoader() AS (key:long, val);");
+        pigServer.registerQuery("STORE A into 'actual' USING mock.Storage();");
+
+        assertEquals(data.get("expected"), data.get("actual"));
+
+    }
 }