You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/05/03 19:16:55 UTC
svn commit: r1478879 - in /pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/...
Author: daijy
Date: Fri May 3 17:16:54 2013
New Revision: 1478879
URL: http://svn.apache.org/r1478879
Log:
PIG-3308: Storing data in hive columnar rc format
Added:
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarStorage.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCOutputFormat.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarStorage.java
Modified:
pig/trunk/CHANGES.txt
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1478879&r1=1478878&r2=1478879&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri May 3 17:16:54 2013
@@ -28,6 +28,8 @@ PIG-3174: Remove rpm and deb artifacts
IMPROVEMENTS
+PIG-3308: Storing data in hive columnar rc format (maczech via daijy)
+
PIG-3303: add hadoop h2 artifact to publications in ivy.xml (julien)
PIG-3169: Remove intermediate data after a job finishes (mwagner via cheolsoo)
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarStorage.java?rev=1478879&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarStorage.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarStorage.java Fri May 3 17:16:54 2013
@@ -0,0 +1,215 @@
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.piggybank.storage.hiverc.HiveRCOutputFormat;
+
+public class HiveColumnarStorage extends PigStorage {
+ private static final String UTF8 = "UTF-8";
+
+ private static final char LIST_DELIMITER = 2;
+ private static final char MAP_DELIMITER = 3;
+
+ private int numColumns = -1;
+ private ByteStream.Output byteStream;
+ private BytesRefArrayWritable rowWritable;
+ private BytesRefWritable[] colValRefs;
+
+ @Override
+ public OutputFormat getOutputFormat() {
+ return new HiveRCOutputFormat();
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ super.setStoreLocation(location, job);
+ // set number of columns if this is set in context.
+ Properties p = getUDFProperties();
+ if (p != null) {
+ numColumns = Integer.parseInt(p.getProperty("numColumns", "-1"));
+ }
+
+ if (numColumns > 0) {
+ RCFileOutputFormat.setColumnNumber(job.getConfiguration(), numColumns);
+ }
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ super.checkSchema(s);
+ getUDFProperties().setProperty("numColumns", Integer.toString(s.getFields().length));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void putNext(Tuple t) throws IOException {
+
+ if (rowWritable == null) { // initialize
+ if (numColumns < 1) {
+ throw new IOException("number of columns is not set");
+ }
+
+ byteStream = new ByteStream.Output();
+ rowWritable = new BytesRefArrayWritable();
+ colValRefs = new BytesRefWritable[numColumns];
+
+ for (int i = 0; i < numColumns; i++) {
+ colValRefs[i] = new BytesRefWritable();
+ rowWritable.set(i, colValRefs[i]);
+ }
+ }
+
+ byteStream.reset();
+
+ int sz = t.size();
+ int startPos = 0;
+
+ for (int i = 0; i < sz && i < numColumns; i++) {
+
+ putField(byteStream, t.get(i));
+ colValRefs[i].set(byteStream.getData(), startPos, byteStream.getCount() - startPos);
+ startPos = byteStream.getCount();
+ }
+
+ try {
+ writer.write(null, rowWritable);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private Properties getUDFProperties() {
+ return UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+ new String[] { signature });
+ }
+
+ public void putField(OutputStream out, Object field) throws IOException {
+
+ switch (DataType.findType(field)) {
+ case DataType.NULL:
+ break; // just leave it empty
+
+ case DataType.BOOLEAN:
+ out.write(((Boolean) field).toString().getBytes());
+ break;
+
+ case DataType.INTEGER:
+ out.write(((Integer) field).toString().getBytes());
+ break;
+
+ case DataType.LONG:
+ out.write(((Long) field).toString().getBytes());
+ break;
+
+ case DataType.FLOAT:
+ out.write(((Float) field).toString().getBytes());
+ break;
+
+ case DataType.DOUBLE:
+ out.write(((Double) field).toString().getBytes());
+ break;
+
+ case DataType.BYTEARRAY:
+ byte[] b = ((DataByteArray) field).get();
+ out.write(b, 0, b.length);
+ break;
+
+ case DataType.CHARARRAY:
+ out.write(((String) field).getBytes(UTF8));
+ break;
+
+ case DataType.MAP:
+ boolean mapHasNext = false;
+ Map<String, Object> m = (Map<String, Object>) field;
+
+ for (Map.Entry<String, Object> e : m.entrySet()) {
+ if (mapHasNext) {
+ out.write(LIST_DELIMITER);
+ } else {
+ mapHasNext = true;
+ }
+ putField(out, e.getKey());
+ out.write(MAP_DELIMITER);
+ putField(out, e.getValue());
+ }
+
+ break;
+ case DataType.INTERNALMAP:
+ boolean internalMapHasNext = false;
+ Map<String, Object> im = (Map<String, Object>) field;
+
+ for (Map.Entry<String, Object> e : im.entrySet()) {
+ if (internalMapHasNext) {
+ out.write(LIST_DELIMITER);
+ } else {
+ internalMapHasNext = true;
+ }
+ putField(out, e.getKey());
+ out.write(MAP_DELIMITER);
+ putField(out, e.getValue());
+ }
+
+ break;
+
+ case DataType.TUPLE:
+ boolean tupleHasNext = false;
+ Tuple t = (Tuple) field;
+
+ for (int i = 0; i < t.size(); ++i) {
+ if (tupleHasNext) {
+ out.write(LIST_DELIMITER);
+ } else {
+ tupleHasNext = true;
+ }
+ try {
+ putField(out, t.get(i));
+ } catch (ExecException ee) {
+ throw ee;
+ }
+ }
+
+ break;
+
+ case DataType.BAG:
+ boolean bagHasNext = false;
+ Iterator<Tuple> tupleIter = ((DataBag) field).iterator();
+ while (tupleIter.hasNext()) {
+ if (bagHasNext) {
+ out.write(LIST_DELIMITER);
+ } else {
+ bagHasNext = true;
+ }
+ putField(out, tupleIter.next());
+ }
+
+ break;
+
+ default: {
+ int errCode = 2108;
+ String msg = "Could not determine data type of field: " + field;
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ }
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCOutputFormat.java?rev=1478879&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCOutputFormat.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCOutputFormat.java Fri May 3 17:16:54 2013
@@ -0,0 +1,113 @@
+package org.apache.pig.piggybank.storage.hiverc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveRCOutputFormat extends FileOutputFormat<NullWritable, Writable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RCFileOutputFormat.class);
+
+ public static String COMPRESSION_CODEC_CONF = "rcfile.output.compression.codec";
+
+ public static String DEFAULT_EXTENSION = ".rc";
+ public static String EXTENSION_OVERRIDE_CONF = "rcfile.output.filename.extension"; // "none" disables it.
+
+ /**
+ * set number of columns into the given configuration.
+ *
+ * @param conf
+ * configuration instance which need to set the column number
+ * @param columnNum
+ * column number for RCFile's Writer
+ *
+ */
+ public static void setColumnNumber(Configuration conf, int columnNum) {
+ assert columnNum > 0;
+ conf.setInt(RCFile.COLUMN_NUMBER_CONF_STR, columnNum);
+ }
+
+ /**
+ * Returns the number of columns set in the conf for writers.
+ *
+ * @param conf
+ * @return number of columns for RCFile's writer
+ */
+ public static int getColumnNumber(Configuration conf) {
+ return conf.getInt(RCFile.COLUMN_NUMBER_CONF_STR, 0);
+ }
+
+ protected RCFile.Writer createRCFileWriter(TaskAttemptContext job,
+ Text columnMetadata)
+ throws IOException {
+ Configuration conf = job.getConfiguration();
+
+ // override compression codec if set.
+ String codecOverride = conf.get(COMPRESSION_CODEC_CONF);
+ if (codecOverride != null) {
+ conf.setBoolean("mapred.output.compress", true);
+ conf.set("mapred.output.compression.codec", codecOverride);
+ }
+
+ CompressionCodec codec = null;
+ if (getCompressOutput(job)) {
+ Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ }
+
+ Metadata metadata = null;
+
+ String ext = conf.get(EXTENSION_OVERRIDE_CONF, DEFAULT_EXTENSION);
+ Path file = getDefaultWorkFile(job, ext.equalsIgnoreCase("none") ? null : ext);
+
+ LOG.info("writing to rcfile " + file.toString());
+
+ return new RCFile.Writer(file.getFileSystem(conf), conf, file, job, metadata, codec);
+ }
+
+ /**
+ * RecordWriter wrapper around an RCFile.Writer
+ */
+ static protected class Writer extends RecordWriter<NullWritable, Writable> {
+
+ private final RCFile.Writer rcfile;
+
+ protected Writer(HiveRCOutputFormat outputFormat,
+ TaskAttemptContext job,
+ Text columnMetadata) throws IOException {
+ rcfile = outputFormat.createRCFileWriter(job, columnMetadata);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ rcfile.close();
+ }
+
+ @Override
+ public void write(NullWritable key, Writable value) throws IOException, InterruptedException {
+ rcfile.append(value);
+ }
+ }
+
+ @Override
+ public RecordWriter<NullWritable, Writable> getRecordWriter(
+ TaskAttemptContext job) throws IOException, InterruptedException {
+ return new Writer(this, job, null);
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarStorage.java?rev=1478879&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarStorage.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarStorage.java Fri May 3 17:16:54 2013
@@ -0,0 +1,335 @@
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarStruct;
+import org.apache.hadoop.hive.serde2.lazy.LazyArray;
+import org.apache.hadoop.hive.serde2.lazy.LazyMap;
+import org.apache.hadoop.hive.serde2.lazy.LazyString;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.piggybank.storage.hiverc.HiveRCSchemaUtil;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+public class TestHiveColumnarStorage extends TestCase {
+
+ static Configuration conf = null;
+ static private FileSystem fs;
+
+ static File simpleDataFile = null;
+ static File simpleDataDir = null;
+
+ static int simpleDirFileCount = 3;
+ static int simpleRowCount = 10;
+ static int columnCount = 3;
+
+
+ @Override
+ public synchronized void setUp() throws Exception {
+
+ conf = new Configuration();
+
+ fs = LocalFileSystem.getLocal(conf);
+
+ produceSimpleData();
+ // Util.deleteDirectory(new File("testhiveColumnarStore"));
+ }
+
+ @Override
+ public void tearDown() {
+ Util.deleteDirectory(simpleDataDir);
+ Util.deleteDirectory(new File("testhiveColumnarStore"));
+ simpleDataFile.delete();
+ }
+
+ @Test
+ public void testShouldStoreRowInHiveFormat() throws IOException, InterruptedException, SerDeException {
+ String loadString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+ String storeString = "org.apache.pig.piggybank.storage.HiveColumnarStorage()";
+
+ String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+ File outputFile = new File("testhiveColumnarStore");
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using " + loadString
+ + ";");
+
+ //when
+ server.store("a", outputFile.getAbsolutePath(), storeString);
+
+ //then
+ Path outputPath = new Path(outputFile.getAbsolutePath()+"/part-m-00000.rc");
+
+ ColumnarStruct struct = readRow(outputFile, outputPath, "f1 string,f2 string,f3 string");
+
+ assertEquals(3, struct.getFieldsAsList().size());
+ Object o = struct.getField(0);
+ assertEquals(LazyString.class, o.getClass());
+ o = struct.getField(1);
+ assertEquals(LazyString.class, o.getClass());
+ o = struct.getField(2);
+ assertEquals(LazyString.class, o.getClass());
+
+ }
+ @Test
+ public void testShouldStoreTupleAsHiveArray() throws IOException, InterruptedException, SerDeException {
+ String loadString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+ String storeString = "org.apache.pig.piggybank.storage.HiveColumnarStorage()";
+
+ String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+ File outputFile = new File("testhiveColumnarStore");
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using " + loadString
+ + ";");
+ server.registerQuery("b = FOREACH a GENERATE f1, TOTUPLE(f2,f3);");
+
+ //when
+ server.store("b", outputFile.getAbsolutePath(), storeString);
+
+ //then
+ Path outputPath = new Path(outputFile.getAbsolutePath()+"/part-m-00000.rc");
+
+ ColumnarStruct struct = readRow(outputFile, outputPath, "f1 string,f2 array<string>");
+
+ assertEquals(2, struct.getFieldsAsList().size());
+ Object o = struct.getField(0);
+ assertEquals(LazyString.class, o.getClass());
+ o = struct.getField(1);
+ assertEquals(LazyArray.class, o.getClass());
+
+ LazyArray arr = (LazyArray)o;
+ List<Object> values = arr.getList();
+ for(Object value : values) {
+ assertEquals(LazyString.class, value.getClass());
+ String valueStr =((LazyString) value).getWritableObject().toString();
+ assertEquals("Sample value", valueStr);
+ }
+
+ }
+ @Test
+ public void testShouldStoreBagAsHiveArray() throws IOException, InterruptedException, SerDeException {
+ String loadString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+ String storeString = "org.apache.pig.piggybank.storage.HiveColumnarStorage()";
+
+ String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+ File outputFile = new File("testhiveColumnarStore");
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using " + loadString
+ + ";");
+ server.registerQuery("b = FOREACH a GENERATE f1, TOBAG(f2,f3);");
+
+ //when
+ server.store("b", outputFile.getAbsolutePath(), storeString);
+
+ //then
+ Path outputPath = new Path(outputFile.getAbsolutePath()+"/part-m-00000.rc");
+
+ ColumnarStruct struct = readRow(outputFile, outputPath, "f1 string,f2 array<string>");
+
+ assertEquals(2, struct.getFieldsAsList().size());
+ Object o = struct.getField(0);
+ assertEquals(LazyString.class, o.getClass());
+ o = struct.getField(1);
+ assertEquals(LazyArray.class, o.getClass());
+
+ LazyArray arr = (LazyArray)o;
+ List<Object> values = arr.getList();
+ for(Object value : values) {
+ assertEquals(LazyString.class, value.getClass());
+ String valueStr =((LazyString) value).getWritableObject().toString();
+ assertEquals("Sample value", valueStr);
+ }
+
+ }
+ @Test
+ public void testShouldStoreMapAsHiveMap() throws IOException, InterruptedException, SerDeException {
+ String loadString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+ String storeString = "org.apache.pig.piggybank.storage.HiveColumnarStorage()";
+
+ String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+ File outputFile = new File("testhiveColumnarStore");
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using " + loadString
+ + ";");
+ server.registerQuery("b = FOREACH a GENERATE f1, TOMAP(f2,f3);");
+
+ //when
+ server.store("b", outputFile.getAbsolutePath(), storeString);
+
+ //then
+ Path outputPath = new Path(outputFile.getAbsolutePath()+"/part-m-00000.rc");
+
+ ColumnarStruct struct = readRow(outputFile, outputPath, "f1 string,f2 map<string,string>");
+
+ assertEquals(2, struct.getFieldsAsList().size());
+ Object o = struct.getField(0);
+ assertEquals(LazyString.class, o.getClass());
+ o = struct.getField(1);
+ assertEquals(LazyMap.class, o.getClass());
+
+ LazyMap arr = (LazyMap)o;
+ Map<Object,Object> values = arr.getMap();
+ for(Entry<Object,Object> entry : values.entrySet()) {
+ assertEquals(LazyString.class, entry.getKey().getClass());
+ assertEquals(LazyString.class, entry.getValue().getClass());
+
+ String keyStr =((LazyString) entry.getKey()).getWritableObject().toString();
+ assertEquals("Sample value", keyStr);
+ String valueStr =((LazyString) entry.getValue()).getWritableObject().toString();
+ assertEquals("Sample value", valueStr);
+ }
+
+ }
+
+ private ColumnarStruct readRow(File outputFile, Path outputPath, String schema) throws IOException,
+ InterruptedException, SerDeException {
+
+ FileSplit fileSplit = new FileSplit(outputPath, 0L, outputFile.length(), (String[])null);
+
+
+ Path splitPath = fileSplit.getPath();
+
+ RCFileRecordReader<LongWritable, BytesRefArrayWritable> rcFileRecordReader = new RCFileRecordReader<LongWritable, BytesRefArrayWritable>(
+ new Configuration(false), new org.apache.hadoop.mapred.FileSplit(splitPath,
+ fileSplit.getStart(), fileSplit.getLength(),
+ new org.apache.hadoop.mapred.JobConf(conf)));
+
+ LongWritable key = rcFileRecordReader.createKey();
+ BytesRefArrayWritable value = rcFileRecordReader.createValue();
+ rcFileRecordReader.next(key, value);
+
+ ColumnarStruct struct = readColumnarStruct(value, schema);
+ return struct;
+ }
+
+ private ColumnarStruct readColumnarStruct(BytesRefArrayWritable buff, String schema) throws SerDeException {
+ Pattern pcols = Pattern.compile("[a-zA-Z_0-9]*[ ]");
+ List<String> types = HiveRCSchemaUtil.parseSchemaTypes(schema);
+ List<String> cols = HiveRCSchemaUtil.parseSchema(pcols, schema);
+
+ List<FieldSchema> fieldSchemaList = new ArrayList<FieldSchema>(
+ cols.size());
+
+ for (int i = 0; i < cols.size(); i++) {
+ fieldSchemaList.add(new FieldSchema(cols.get(i), HiveRCSchemaUtil
+ .findPigDataType(types.get(i))));
+ }
+
+ Properties props = new Properties();
+
+ props.setProperty(Constants.LIST_COLUMNS,
+ HiveRCSchemaUtil.listToString(cols));
+ props.setProperty(Constants.LIST_COLUMN_TYPES,
+ HiveRCSchemaUtil.listToString(types));
+
+ Configuration hiveConf = new HiveConf(conf, SessionState.class);
+ ColumnarSerDe serde = new ColumnarSerDe();
+ serde.initialize(hiveConf, props);
+
+ return (ColumnarStruct) serde.deserialize(buff);
+ }
+
+
+ /**
+ * Writes out a simple temporary file with 5 columns and 100 rows.<br/>
+ * Data is random numbers.
+ *
+ * @throws SerDeException
+ * @throws IOException
+ */
+ private static final void produceSimpleData() throws SerDeException, IOException {
+ // produce on single file
+ simpleDataFile = File.createTempFile("testhiveColumnarLoader", ".txt");
+ simpleDataFile.deleteOnExit();
+
+ Path path = new Path(simpleDataFile.getPath());
+
+ writeRCFileTest(fs, simpleRowCount, path, columnCount, new DefaultCodec(), columnCount);
+
+ // produce a folder of simple data
+ simpleDataDir = new File("simpleDataDir" + System.currentTimeMillis());
+ simpleDataDir.mkdir();
+
+ for (int i = 0; i < simpleDirFileCount; i++) {
+
+ simpleDataFile = new File(simpleDataDir, "testhiveColumnarLoader-" + i + ".txt");
+
+ Path filePath = new Path(simpleDataFile.getPath());
+
+ writeRCFileTest(fs, simpleRowCount, filePath, columnCount, new DefaultCodec(),
+ columnCount);
+
+ }
+
+ }
+
+ private static int writeRCFileTest(FileSystem fs, int rowCount, Path file, int columnNum,
+ CompressionCodec codec, int columnCount) throws IOException {
+ fs.delete(file, true);
+ int rowsWritten = 0;
+
+
+ RCFileOutputFormat.setColumnNumber(conf, columnNum);
+ RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null, codec);
+
+ byte[][] columnRandom;
+
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(columnNum);
+ columnRandom = new byte[columnNum][];
+ for (int i = 0; i < columnNum; i++) {
+ BytesRefWritable cu = new BytesRefWritable();
+ bytes.set(i, cu);
+ }
+
+ for (int i = 0; i < rowCount; i++) {
+
+ bytes.resetValid(columnRandom.length);
+ for (int j = 0; j < columnRandom.length; j++) {
+ columnRandom[j]= "Sample value".getBytes();
+ bytes.get(j).set(columnRandom[j], 0, columnRandom[j].length);
+ }
+ rowsWritten++;
+ writer.append(bytes);
+ }
+ writer.close();
+
+ return rowsWritten;
+ }
+
+}