You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/05/03 19:16:55 UTC

svn commit: r1478879 - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/...

Author: daijy
Date: Fri May  3 17:16:54 2013
New Revision: 1478879

URL: http://svn.apache.org/r1478879
Log:
PIG-3308: Storing data in hive columnar rc format

Added:
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarStorage.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCOutputFormat.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarStorage.java
Modified:
    pig/trunk/CHANGES.txt

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1478879&r1=1478878&r2=1478879&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri May  3 17:16:54 2013
@@ -28,6 +28,8 @@ PIG-3174:  Remove rpm and deb artifacts 
 
 IMPROVEMENTS
 
+PIG-3308: Storing data in hive columnar rc format (maczech via daijy)
+
 PIG-3303: add hadoop h2 artifact to publications in ivy.xml (julien)
 
 PIG-3169: Remove intermediate data after a job finishes (mwagner via cheolsoo)

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarStorage.java?rev=1478879&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarStorage.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarStorage.java Fri May  3 17:16:54 2013
@@ -0,0 +1,215 @@
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.piggybank.storage.hiverc.HiveRCOutputFormat;
+
+public class HiveColumnarStorage extends PigStorage {
+    private static final String UTF8 = "UTF-8";
+
+    private static final char LIST_DELIMITER = 2;
+    private static final char MAP_DELIMITER = 3;
+
+    private int numColumns = -1;
+    private ByteStream.Output byteStream;
+    private BytesRefArrayWritable rowWritable;
+    private BytesRefWritable[] colValRefs;
+
+    @Override
+    public OutputFormat getOutputFormat() {
+        return new HiveRCOutputFormat();
+    }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        super.setStoreLocation(location, job);
+        // set number of columns if this is set in context.
+        Properties p = getUDFProperties();
+        if (p != null) {
+            numColumns = Integer.parseInt(p.getProperty("numColumns", "-1"));
+        }
+
+        if (numColumns > 0) {
+            RCFileOutputFormat.setColumnNumber(job.getConfiguration(), numColumns);
+        }
+    }
+
+    @Override
+    public void checkSchema(ResourceSchema s) throws IOException {
+        super.checkSchema(s);
+        getUDFProperties().setProperty("numColumns", Integer.toString(s.getFields().length));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void putNext(Tuple t) throws IOException {
+
+        if (rowWritable == null) { // initialize
+            if (numColumns < 1) {
+                throw new IOException("number of columns is not set");
+            }
+
+            byteStream = new ByteStream.Output();
+            rowWritable = new BytesRefArrayWritable();
+            colValRefs = new BytesRefWritable[numColumns];
+
+            for (int i = 0; i < numColumns; i++) {
+                colValRefs[i] = new BytesRefWritable();
+                rowWritable.set(i, colValRefs[i]);
+            }
+        }
+
+        byteStream.reset();
+
+        int sz = t.size();
+        int startPos = 0;
+
+        for (int i = 0; i < sz && i < numColumns; i++) {
+
+            putField(byteStream, t.get(i));
+            colValRefs[i].set(byteStream.getData(), startPos, byteStream.getCount() - startPos);
+            startPos = byteStream.getCount();
+        }
+
+        try {
+            writer.write(null, rowWritable);
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private Properties getUDFProperties() {
+        return UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+                new String[] { signature });
+    }
+
+    public void putField(OutputStream out, Object field) throws IOException {
+
+        switch (DataType.findType(field)) {
+        case DataType.NULL:
+            break; // just leave it empty
+
+        case DataType.BOOLEAN:
+            out.write(((Boolean) field).toString().getBytes());
+            break;
+
+        case DataType.INTEGER:
+            out.write(((Integer) field).toString().getBytes());
+            break;
+
+        case DataType.LONG:
+            out.write(((Long) field).toString().getBytes());
+            break;
+
+        case DataType.FLOAT:
+            out.write(((Float) field).toString().getBytes());
+            break;
+
+        case DataType.DOUBLE:
+            out.write(((Double) field).toString().getBytes());
+            break;
+
+        case DataType.BYTEARRAY:
+            byte[] b = ((DataByteArray) field).get();
+            out.write(b, 0, b.length);
+            break;
+
+        case DataType.CHARARRAY:
+            out.write(((String) field).getBytes(UTF8));
+            break;
+
+        case DataType.MAP:
+            boolean mapHasNext = false;
+            Map<String, Object> m = (Map<String, Object>) field;
+
+            for (Map.Entry<String, Object> e : m.entrySet()) {
+                if (mapHasNext) {
+                    out.write(LIST_DELIMITER);
+                } else {
+                    mapHasNext = true;
+                }
+                putField(out, e.getKey());
+                out.write(MAP_DELIMITER);
+                putField(out, e.getValue());
+            }
+
+            break;
+        case DataType.INTERNALMAP:
+            boolean internalMapHasNext = false;
+            Map<String, Object> im = (Map<String, Object>) field;
+
+            for (Map.Entry<String, Object> e : im.entrySet()) {
+                if (internalMapHasNext) {
+                    out.write(LIST_DELIMITER);
+                } else {
+                    internalMapHasNext = true;
+                }
+                putField(out, e.getKey());
+                out.write(MAP_DELIMITER);
+                putField(out, e.getValue());
+            }
+
+            break;
+
+        case DataType.TUPLE:
+            boolean tupleHasNext = false;
+            Tuple t = (Tuple) field;
+
+            for (int i = 0; i < t.size(); ++i) {
+                if (tupleHasNext) {
+                    out.write(LIST_DELIMITER);
+                } else {
+                    tupleHasNext = true;
+                }
+                try {
+                    putField(out, t.get(i));
+                } catch (ExecException ee) {
+                    throw ee;
+                }
+            }
+
+            break;
+
+        case DataType.BAG:
+            boolean bagHasNext = false;
+            Iterator<Tuple> tupleIter = ((DataBag) field).iterator();
+            while (tupleIter.hasNext()) {
+                if (bagHasNext) {
+                    out.write(LIST_DELIMITER);
+                } else {
+                    bagHasNext = true;
+                }
+                putField(out, tupleIter.next());
+            }
+
+            break;
+
+        default: {
+            int errCode = 2108;
+            String msg = "Could not determine data type of field: " + field;
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+
+        }
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCOutputFormat.java?rev=1478879&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCOutputFormat.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCOutputFormat.java Fri May  3 17:16:54 2013
@@ -0,0 +1,113 @@
+package org.apache.pig.piggybank.storage.hiverc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveRCOutputFormat extends FileOutputFormat<NullWritable, Writable> {
+
+        private static final Logger LOG = LoggerFactory.getLogger(RCFileOutputFormat.class);
+
+        public static String COMPRESSION_CODEC_CONF = "rcfile.output.compression.codec";
+
+        public static String DEFAULT_EXTENSION = ".rc";
+        public static String EXTENSION_OVERRIDE_CONF = "rcfile.output.filename.extension"; // "none" disables it.
+
+        /**
+         * set number of columns into the given configuration.
+         *
+         * @param conf
+         *          configuration instance which need to set the column number
+         * @param columnNum
+         *          column number for RCFile's Writer
+         *
+         */
+        public static void setColumnNumber(Configuration conf, int columnNum) {
+          assert columnNum > 0;
+          conf.setInt(RCFile.COLUMN_NUMBER_CONF_STR, columnNum);
+        }
+
+        /**
+         * Returns the number of columns set in the conf for writers.
+         *
+         * @param conf
+         * @return number of columns for RCFile's writer
+         */
+        public static int getColumnNumber(Configuration conf) {
+          return conf.getInt(RCFile.COLUMN_NUMBER_CONF_STR, 0);
+        }
+
+        protected RCFile.Writer createRCFileWriter(TaskAttemptContext job,
+                                                   Text columnMetadata)
+                                                   throws IOException {
+          Configuration conf = job.getConfiguration();
+
+          // override compression codec if set.
+          String codecOverride = conf.get(COMPRESSION_CODEC_CONF);
+          if (codecOverride != null) {
+            conf.setBoolean("mapred.output.compress", true);
+            conf.set("mapred.output.compression.codec", codecOverride);
+          }
+
+          CompressionCodec codec = null;
+          if (getCompressOutput(job)) {
+            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
+            codec = ReflectionUtils.newInstance(codecClass, conf);
+          }
+
+          Metadata metadata = null;
+
+          String ext = conf.get(EXTENSION_OVERRIDE_CONF, DEFAULT_EXTENSION);
+          Path file = getDefaultWorkFile(job, ext.equalsIgnoreCase("none") ? null : ext);
+
+          LOG.info("writing to rcfile " + file.toString());
+
+          return new RCFile.Writer(file.getFileSystem(conf), conf, file, job, metadata, codec);
+        }
+
+        /**
+         * RecordWriter wrapper around an RCFile.Writer
+         */
+        static protected class Writer extends RecordWriter<NullWritable, Writable> {
+
+          private final RCFile.Writer rcfile;
+
+          protected Writer(HiveRCOutputFormat outputFormat,
+                           TaskAttemptContext job,
+                           Text columnMetadata) throws IOException {
+            rcfile = outputFormat.createRCFileWriter(job, columnMetadata);
+          }
+
+          @Override
+          public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+            rcfile.close();
+          }
+
+          @Override
+          public void write(NullWritable key, Writable value) throws IOException, InterruptedException {
+            rcfile.append(value);
+          }
+        }
+
+        @Override
+        public RecordWriter<NullWritable, Writable> getRecordWriter(
+            TaskAttemptContext job) throws IOException, InterruptedException {
+          return new Writer(this, job, null);
+        }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarStorage.java?rev=1478879&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarStorage.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarStorage.java Fri May  3 17:16:54 2013
@@ -0,0 +1,335 @@
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarStruct;
+import org.apache.hadoop.hive.serde2.lazy.LazyArray;
+import org.apache.hadoop.hive.serde2.lazy.LazyMap;
+import org.apache.hadoop.hive.serde2.lazy.LazyString;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.piggybank.storage.hiverc.HiveRCSchemaUtil;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+public class TestHiveColumnarStorage extends TestCase {
+
+    static Configuration conf = null;
+    static private FileSystem fs;
+
+    static File simpleDataFile = null;
+    static File simpleDataDir = null;
+
+    static int simpleDirFileCount = 3;
+    static int simpleRowCount = 10;
+    static int columnCount = 3;
+
+
+    @Override
+    public synchronized void setUp() throws Exception {
+
+        conf = new Configuration();
+
+        fs = LocalFileSystem.getLocal(conf);
+
+        produceSimpleData();
+       // Util.deleteDirectory(new File("testhiveColumnarStore"));
+    }
+
+    @Override
+    public void tearDown() {
+        Util.deleteDirectory(simpleDataDir);
+        Util.deleteDirectory(new File("testhiveColumnarStore"));
+        simpleDataFile.delete();
+    }
+
+    @Test
+    public void testShouldStoreRowInHiveFormat() throws IOException, InterruptedException, SerDeException {
+        String loadString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+        String storeString = "org.apache.pig.piggybank.storage.HiveColumnarStorage()";
+
+        String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+        File outputFile = new File("testhiveColumnarStore");
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.setBatchOn();
+        server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using " + loadString
+                + ";");
+
+        //when
+        server.store("a", outputFile.getAbsolutePath(), storeString);
+
+        //then
+        Path outputPath = new Path(outputFile.getAbsolutePath()+"/part-m-00000.rc");
+
+        ColumnarStruct struct = readRow(outputFile, outputPath, "f1 string,f2 string,f3 string");
+
+        assertEquals(3, struct.getFieldsAsList().size());
+        Object o =  struct.getField(0);
+        assertEquals(LazyString.class, o.getClass());
+        o =  struct.getField(1);
+        assertEquals(LazyString.class, o.getClass());
+        o =  struct.getField(2);
+        assertEquals(LazyString.class, o.getClass());
+
+    }
+    @Test
+    public void testShouldStoreTupleAsHiveArray() throws IOException, InterruptedException, SerDeException {
+        String loadString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+        String storeString = "org.apache.pig.piggybank.storage.HiveColumnarStorage()";
+
+        String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+        File outputFile = new File("testhiveColumnarStore");
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.setBatchOn();
+        server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using " + loadString
+                + ";");
+        server.registerQuery("b = FOREACH a GENERATE f1, TOTUPLE(f2,f3);");
+
+        //when
+        server.store("b", outputFile.getAbsolutePath(), storeString);
+
+        //then
+        Path outputPath = new Path(outputFile.getAbsolutePath()+"/part-m-00000.rc");
+
+        ColumnarStruct struct = readRow(outputFile, outputPath, "f1 string,f2 array<string>");
+
+        assertEquals(2, struct.getFieldsAsList().size());
+        Object o =  struct.getField(0);
+        assertEquals(LazyString.class, o.getClass());
+        o =  struct.getField(1);
+        assertEquals(LazyArray.class, o.getClass());
+
+        LazyArray arr = (LazyArray)o;
+        List<Object> values = arr.getList();
+        for(Object value : values) {
+            assertEquals(LazyString.class, value.getClass());
+            String valueStr =((LazyString) value).getWritableObject().toString();
+            assertEquals("Sample value", valueStr);
+        }
+
+    }
+    @Test
+    public void testShouldStoreBagAsHiveArray() throws IOException, InterruptedException, SerDeException {
+        String loadString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+        String storeString = "org.apache.pig.piggybank.storage.HiveColumnarStorage()";
+
+        String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+        File outputFile = new File("testhiveColumnarStore");
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.setBatchOn();
+        server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using " + loadString
+                + ";");
+        server.registerQuery("b = FOREACH a GENERATE f1, TOBAG(f2,f3);");
+
+        //when
+        server.store("b", outputFile.getAbsolutePath(), storeString);
+
+        //then
+        Path outputPath = new Path(outputFile.getAbsolutePath()+"/part-m-00000.rc");
+
+        ColumnarStruct struct = readRow(outputFile, outputPath, "f1 string,f2 array<string>");
+
+        assertEquals(2, struct.getFieldsAsList().size());
+        Object o =  struct.getField(0);
+        assertEquals(LazyString.class, o.getClass());
+        o =  struct.getField(1);
+        assertEquals(LazyArray.class, o.getClass());
+
+        LazyArray arr = (LazyArray)o;
+        List<Object> values = arr.getList();
+        for(Object value : values) {
+            assertEquals(LazyString.class, value.getClass());
+            String valueStr =((LazyString) value).getWritableObject().toString();
+            assertEquals("Sample value", valueStr);
+        }
+
+    }
+    @Test
+    public void testShouldStoreMapAsHiveMap() throws IOException, InterruptedException, SerDeException {
+        String loadString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+        String storeString = "org.apache.pig.piggybank.storage.HiveColumnarStorage()";
+
+        String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+        File outputFile = new File("testhiveColumnarStore");
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.setBatchOn();
+        server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using " + loadString
+                + ";");
+        server.registerQuery("b = FOREACH a GENERATE f1, TOMAP(f2,f3);");
+
+        //when
+        server.store("b", outputFile.getAbsolutePath(), storeString);
+
+        //then
+        Path outputPath = new Path(outputFile.getAbsolutePath()+"/part-m-00000.rc");
+
+        ColumnarStruct struct = readRow(outputFile, outputPath, "f1 string,f2 map<string,string>");
+
+        assertEquals(2, struct.getFieldsAsList().size());
+        Object o =  struct.getField(0);
+        assertEquals(LazyString.class, o.getClass());
+        o =  struct.getField(1);
+        assertEquals(LazyMap.class, o.getClass());
+
+        LazyMap arr = (LazyMap)o;
+        Map<Object,Object> values = arr.getMap();
+        for(Entry<Object,Object> entry : values.entrySet()) {
+            assertEquals(LazyString.class, entry.getKey().getClass());
+            assertEquals(LazyString.class, entry.getValue().getClass());
+
+            String keyStr =((LazyString) entry.getKey()).getWritableObject().toString();
+            assertEquals("Sample value", keyStr);
+            String valueStr =((LazyString) entry.getValue()).getWritableObject().toString();
+            assertEquals("Sample value", valueStr);
+        }
+
+    }
+
+    private ColumnarStruct readRow(File outputFile, Path outputPath, String schema) throws IOException,
+            InterruptedException, SerDeException {
+
+        FileSplit fileSplit = new FileSplit(outputPath, 0L, outputFile.length(), (String[])null);
+
+
+        Path splitPath = fileSplit.getPath();
+
+        RCFileRecordReader<LongWritable, BytesRefArrayWritable> rcFileRecordReader = new RCFileRecordReader<LongWritable, BytesRefArrayWritable>(
+            new Configuration(false), new org.apache.hadoop.mapred.FileSplit(splitPath,
+                fileSplit.getStart(), fileSplit.getLength(),
+                new org.apache.hadoop.mapred.JobConf(conf)));
+
+        LongWritable key = rcFileRecordReader.createKey();
+        BytesRefArrayWritable value = rcFileRecordReader.createValue();
+        rcFileRecordReader.next(key, value);
+
+        ColumnarStruct struct = readColumnarStruct(value, schema);
+        return struct;
+    }
+
+    private ColumnarStruct readColumnarStruct(BytesRefArrayWritable buff, String schema) throws SerDeException {
+        Pattern pcols = Pattern.compile("[a-zA-Z_0-9]*[ ]");
+        List<String> types = HiveRCSchemaUtil.parseSchemaTypes(schema);
+        List<String> cols = HiveRCSchemaUtil.parseSchema(pcols, schema);
+
+        List<FieldSchema> fieldSchemaList = new ArrayList<FieldSchema>(
+            cols.size());
+
+        for (int i = 0; i < cols.size(); i++) {
+            fieldSchemaList.add(new FieldSchema(cols.get(i), HiveRCSchemaUtil
+                .findPigDataType(types.get(i))));
+        }
+
+        Properties props = new Properties();
+
+        props.setProperty(Constants.LIST_COLUMNS,
+            HiveRCSchemaUtil.listToString(cols));
+        props.setProperty(Constants.LIST_COLUMN_TYPES,
+            HiveRCSchemaUtil.listToString(types));
+
+        Configuration hiveConf = new HiveConf(conf, SessionState.class);
+        ColumnarSerDe serde = new ColumnarSerDe();
+        serde.initialize(hiveConf, props);
+
+        return (ColumnarStruct) serde.deserialize(buff);
+   }
+
+
+    /**
+     * Writes out a simple temporary file with 5 columns and 100 rows.<br/>
+     * Data is random numbers.
+     *
+     * @throws SerDeException
+     * @throws IOException
+     */
+    private static final void produceSimpleData() throws SerDeException, IOException {
+        // produce on single file
+        simpleDataFile = File.createTempFile("testhiveColumnarLoader", ".txt");
+        simpleDataFile.deleteOnExit();
+
+        Path path = new Path(simpleDataFile.getPath());
+
+        writeRCFileTest(fs, simpleRowCount, path, columnCount, new DefaultCodec(), columnCount);
+
+        // produce a folder of simple data
+        simpleDataDir = new File("simpleDataDir" + System.currentTimeMillis());
+        simpleDataDir.mkdir();
+
+        for (int i = 0; i < simpleDirFileCount; i++) {
+
+            simpleDataFile = new File(simpleDataDir, "testhiveColumnarLoader-" + i + ".txt");
+
+            Path filePath = new Path(simpleDataFile.getPath());
+
+            writeRCFileTest(fs, simpleRowCount, filePath, columnCount, new DefaultCodec(),
+                    columnCount);
+
+        }
+
+    }
+
+    private static int writeRCFileTest(FileSystem fs, int rowCount, Path file, int columnNum,
+            CompressionCodec codec, int columnCount) throws IOException {
+        fs.delete(file, true);
+        int rowsWritten = 0;
+
+
+        RCFileOutputFormat.setColumnNumber(conf, columnNum);
+        RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null, codec);
+
+        byte[][] columnRandom;
+
+        BytesRefArrayWritable bytes = new BytesRefArrayWritable(columnNum);
+        columnRandom = new byte[columnNum][];
+        for (int i = 0; i < columnNum; i++) {
+            BytesRefWritable cu = new BytesRefWritable();
+            bytes.set(i, cu);
+        }
+
+        for (int i = 0; i < rowCount; i++) {
+
+            bytes.resetValid(columnRandom.length);
+            for (int j = 0; j < columnRandom.length; j++) {
+                columnRandom[j]= "Sample value".getBytes();
+                bytes.get(j).set(columnRandom[j], 0, columnRandom[j].length);
+            }
+            rowsWritten++;
+            writer.append(bytes);
+        }
+        writer.close();
+
+        return rowsWritten;
+    }
+
+}