You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/06/02 03:59:04 UTC
[1/2] orc git commit: ORC-52 Add support for
org.apache.hadoop.mapreduce InputFormat and OutputFormat. (omalley)
Repository: orc
Updated Branches:
refs/heads/master 545fe3712 -> 3bb5ce532
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
new file mode 100644
index 0000000..f686e05
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcMapredRecordReader;
+import org.apache.orc.mapred.OrcStruct;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This record reader implements the org.apache.hadoop.mapreduce API.
+ * It is in the org.apache.orc.mapred package to share implementation with
+ * the mapred API record reader.
+ * @param <V> the root type of the file
+ */
+public class OrcMapreduceRecordReader<V extends WritableComparable>
+ extends org.apache.hadoop.mapreduce.RecordReader<NullWritable, V> {
+ private final TypeDescription schema;
+ private final RecordReader batchReader;
+ private final VectorizedRowBatch batch;
+ private int rowInBatch;
+ private final V row;
+
+ public OrcMapreduceRecordReader(Reader fileReader,
+ Reader.Options options) throws IOException {
+ this.batchReader = fileReader.rows(options);
+ if (options.getSchema() == null) {
+ schema = fileReader.getSchema();
+ } else {
+ schema = options.getSchema();
+ }
+ this.batch = schema.createRowBatch();
+ rowInBatch = 0;
+ this.row = (V) OrcStruct.createValue(schema);
+ }
+
+ /**
+ * If the current batch is empty, get a new one.
+ * @return true if we have rows available.
+ * @throws IOException
+ */
+ boolean ensureBatch() throws IOException {
+ if (rowInBatch >= batch.size) {
+ rowInBatch = 0;
+ return batchReader.nextBatch(batch);
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ batchReader.close();
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) {
+ // nothing required
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!ensureBatch()) {
+ return false;
+ }
+ if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+ OrcStruct result = (OrcStruct) row;
+ List<TypeDescription> children = schema.getChildren();
+ int numberOfChildren = children.size();
+ for(int i=0; i < numberOfChildren; ++i) {
+ result.setFieldValue(i, OrcMapredRecordReader.nextValue(batch.cols[i], rowInBatch,
+ children.get(i), result.getFieldValue(i)));
+ }
+ } else {
+ OrcMapredRecordReader.nextValue(batch.cols[0], rowInBatch, schema, row);
+ }
+ rowInBatch += 1;
+ return true;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public V getCurrentValue() throws IOException, InterruptedException {
+ return row;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return batchReader.getProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java
new file mode 100644
index 0000000..9379584
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcMapredRecordWriter;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+
+import java.io.IOException;
+
+public class OrcMapreduceRecordWriter<V extends Writable>
+ extends RecordWriter<NullWritable, V> {
+
+ private final Writer writer;
+ private final VectorizedRowBatch batch;
+ private final TypeDescription schema;
+ private final boolean isTopStruct;
+
+ public OrcMapreduceRecordWriter(Writer writer) {
+ this.writer = writer;
+ schema = writer.getSchema();
+ this.batch = schema.createRowBatch();
+ isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT;
+ }
+
+ @Override
+ public void write(NullWritable nullWritable, V v) throws IOException {
+ // if the batch is full, write it out.
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+
+ // add the new row
+ int row = batch.size++;
+ // skip over the OrcKey or OrcValue
+ if (v instanceof OrcKey) {
+ v = (V)((OrcKey) v).key;
+ } else if (v instanceof OrcValue) {
+ v = (V)((OrcValue) v).value;
+ }
+ if (isTopStruct) {
+ for(int f=0; f < schema.getChildren().size(); ++f) {
+ OrcMapredRecordWriter.setColumn(schema.getChildren().get(f),
+ batch.cols[f], row, ((OrcStruct) v).getFieldValue(f));
+ }
+ } else {
+ OrcMapredRecordWriter.setColumn(schema, batch.cols[0], row, v);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException {
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ }
+ writer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java
new file mode 100644
index 0000000..797998c
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+
+/**
+ * An ORC output format that satisfies the org.apache.hadoop.mapreduce API.
+ */
+public class OrcOutputFormat<V extends Writable>
+ extends FileOutputFormat<NullWritable, V> {
+ private static final String EXTENSION = ".orc";
+ // This is useful for unit tests or local runs where you don't need the
+ // output committer.
+ public static final String SKIP_TEMP_DIRECTORY =
+ "orc.mapreduce.output.skip-temporary-directory";
+
+ @Override
+ public RecordWriter<NullWritable, V>
+ getRecordWriter(TaskAttemptContext taskAttemptContext
+ ) throws IOException {
+ Configuration conf = taskAttemptContext.getConfiguration();
+ Path filename = getDefaultWorkFile(taskAttemptContext, EXTENSION);
+ Writer writer = OrcFile.createWriter(filename,
+ org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf));
+ return new OrcMapreduceRecordWriter<V>(writer);
+ }
+
+ @Override
+ public Path getDefaultWorkFile(TaskAttemptContext context,
+ String extension) throws IOException {
+ if (context.getConfiguration().getBoolean(SKIP_TEMP_DIRECTORY, false)) {
+ return new Path(getOutputPath(context),
+ getUniqueFile(context, getOutputName(context), extension));
+ } else {
+ return super.getDefaultWorkFile(context, extension);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java
new file mode 100644
index 0000000..cd11603
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mrunit.MapReduceDriver;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+public class TestMrUnit {
+ JobConf conf = new JobConf();
+
+ /**
+ * Split the input struct into its two parts.
+ */
+ public static class MyMapper
+ implements Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> {
+ private OrcKey keyWrapper = new OrcKey();
+ private OrcValue valueWrapper = new OrcValue();
+
+ @Override
+ public void map(NullWritable key, OrcStruct value,
+ OutputCollector<OrcKey, OrcValue> outputCollector,
+ Reporter reporter) throws IOException {
+ keyWrapper.key = value.getFieldValue(0);
+ valueWrapper.value = value.getFieldValue(1);
+ outputCollector.collect(keyWrapper, valueWrapper);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // PASS
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ // PASS
+ }
+ }
+
+ /**
+ * Glue the key and values back together.
+ */
+ public static class MyReducer
+ implements Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> {
+ private OrcStruct output = new OrcStruct(TypeDescription.fromString
+ ("struct<first:struct<x:int,y:int>,second:struct<z:string>>"));
+ private final NullWritable nada = NullWritable.get();
+
+ @Override
+ public void reduce(OrcKey key, Iterator<OrcValue> iterator,
+ OutputCollector<NullWritable, OrcStruct> collector,
+ Reporter reporter) throws IOException {
+ output.setFieldValue(0, key.key);
+ while (iterator.hasNext()) {
+ OrcValue value = iterator.next();
+ output.setFieldValue(1, value.value);
+ collector.collect(nada, output);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // PASS
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ // PASS
+ }
+ }
+
+ /**
+ * This class is intended to support MRUnit's object copying for input and
+ * output objects.
+ *
+ * Real mapreduce contexts should NEVER use this class.
+ *
+ * The type string is serialized before each value.
+ */
+ public static class OrcStructSerialization
+ implements Serialization<OrcStruct> {
+
+ @Override
+ public boolean accept(Class<?> cls) {
+ return OrcStruct.class.isAssignableFrom(cls);
+ }
+
+ @Override
+ public Serializer<OrcStruct> getSerializer(Class<OrcStruct> aClass) {
+ return new Serializer<OrcStruct>() {
+ DataOutputStream dataOut;
+
+ public void open(OutputStream out) {
+ if(out instanceof DataOutputStream) {
+ dataOut = (DataOutputStream)out;
+ } else {
+ dataOut = new DataOutputStream(out);
+ }
+ }
+
+ public void serialize(OrcStruct w) throws IOException {
+ Text.writeString(dataOut, w.getSchema().toString());
+ w.write(dataOut);
+ }
+
+ public void close() throws IOException {
+ dataOut.close();
+ }
+ };
+ }
+
+ @Override
+ public Deserializer<OrcStruct> getDeserializer(Class<OrcStruct> aClass) {
+ return new Deserializer<OrcStruct>() {
+ DataInputStream input;
+
+ @Override
+ public void open(InputStream inputStream) throws IOException {
+ if(inputStream instanceof DataInputStream) {
+ input = (DataInputStream)inputStream;
+ } else {
+ input = new DataInputStream(inputStream);
+ }
+ }
+
+ @Override
+ public OrcStruct deserialize(OrcStruct orcStruct) throws IOException {
+ String typeStr = Text.readString(input);
+ OrcStruct result = new OrcStruct(TypeDescription.fromString(typeStr));
+ result.readFields(input);
+ return result;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // PASS
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testMapred() throws IOException {
+ conf.set("io.serializations",
+ OrcStructSerialization.class.getName() + "," +
+ WritableSerialization.class.getName());
+ OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.setString(conf, "struct<x:int,y:int>");
+ OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(conf, "struct<z:string>");
+ MyMapper mapper = new MyMapper();
+ mapper.configure(conf);
+ MyReducer reducer = new MyReducer();
+ reducer.configure(conf);
+ MapReduceDriver<NullWritable, OrcStruct,
+ OrcKey, OrcValue,
+ NullWritable, OrcStruct> driver =
+ new MapReduceDriver<>(mapper, reducer);
+ driver.setConfiguration(conf);
+ NullWritable nada = NullWritable.get();
+ OrcStruct input = (OrcStruct) OrcStruct.createValue(
+ TypeDescription.fromString("struct<one:struct<x:int,y:int>,two:struct<z:string>>"));
+ IntWritable x =
+ (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(0);
+ IntWritable y =
+ (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(1);
+ Text z = (Text) ((OrcStruct) input.getFieldValue(1)).getFieldValue(0);
+
+ // generate the input stream
+ for(int r=0; r < 20; ++r) {
+ x.set(100 - (r / 4));
+ y.set(r*2);
+ z.set(Integer.toHexString(r));
+ driver.withInput(nada, input);
+ }
+
+ // generate the expected outputs
+ for(int g=4; g >= 0; --g) {
+ x.set(100 - g);
+ for(int i=0; i < 4; ++i) {
+ int r = g * 4 + i;
+ y.set(r * 2);
+ z.set(Integer.toHexString(r));
+ driver.withOutput(nada, input);
+ }
+ }
+ driver.runTest();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
new file mode 100644
index 0000000..a915ed3
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestOrcOutputFormat {
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ JobConf conf = new JobConf();
+ FileSystem fs;
+
+ {
+ try {
+ fs = FileSystem.getLocal(conf).getRaw();
+ fs.delete(workDir, true);
+ fs.mkdirs(workDir);
+ } catch (IOException e) {
+ throw new IllegalStateException("bad fs init", e);
+ }
+ }
+
+ static class NullOutputCommitter extends OutputCommitter {
+
+ @Override
+ public void setupJob(JobContext jobContext) {
+ // PASS
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskAttemptContext) {
+
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskAttemptContext) {
+ // PASS
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskAttemptContext) {
+ // PASS
+ }
+ }
+
+ @Test
+ public void testAllTypes() throws Exception {
+ conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
+ conf.setOutputCommitter(NullOutputCommitter.class);
+ final String typeStr = "struct<b1:binary,b2:boolean,b3:tinyint," +
+ "c:char(10),d1:date,d2:decimal(20,5),d3:double,fff:float,int:int," +
+ "l:array<bigint>,map:map<smallint,string>," +
+ "str:struct<u:uniontype<timestamp,varchar(100)>>,ts:timestamp>";
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+ FileOutputFormat.setOutputPath(conf, workDir);
+ TypeDescription type = TypeDescription.fromString(typeStr);
+
+ // build a row object
+ OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
+ ((BytesWritable) row.getFieldValue(0)).set(new byte[]{1,2,3,4}, 0, 4);
+ ((BooleanWritable) row.getFieldValue(1)).set(true);
+ ((ByteWritable) row.getFieldValue(2)).set((byte) 23);
+ ((Text) row.getFieldValue(3)).set("aaabbbcccddd");
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+ ((DateWritable) row.getFieldValue(4)).set(DateWritable.millisToDays
+ (format.parse("2016-04-01").getTime()));
+ ((HiveDecimalWritable) row.getFieldValue(5)).set(new HiveDecimalWritable("1.23"));
+ ((DoubleWritable) row.getFieldValue(6)).set(1.5);
+ ((FloatWritable) row.getFieldValue(7)).set(4.5f);
+ ((IntWritable) row.getFieldValue(8)).set(31415);
+ OrcList<LongWritable> longList = (OrcList<LongWritable>) row.getFieldValue(9);
+ longList.add(new LongWritable(123));
+ longList.add(new LongWritable(456));
+ OrcMap<ShortWritable,Text> map = (OrcMap<ShortWritable,Text>) row.getFieldValue(10);
+ map.put(new ShortWritable((short) 1000), new Text("aaaa"));
+ map.put(new ShortWritable((short) 123), new Text("bbbb"));
+ OrcStruct struct = (OrcStruct) row.getFieldValue(11);
+ OrcUnion union = (OrcUnion) struct.getFieldValue(0);
+ union.set((byte) 1, new Text("abcde"));
+ ((OrcTimestamp) row.getFieldValue(12)).set("1996-12-11 15:00:00");
+ NullWritable nada = NullWritable.get();
+ RecordWriter<NullWritable, OrcStruct> writer =
+ new OrcOutputFormat<OrcStruct>().getRecordWriter(fs, conf, "all.orc",
+ Reporter.NULL);
+ for(int r=0; r < 10; ++r) {
+ row.setFieldValue(8, new IntWritable(r * 10));
+ writer.write(nada, row);
+ }
+ union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
+ for(int r=0; r < 10; ++r) {
+ row.setFieldValue(8, new IntWritable(r * 10 + 100));
+ writer.write(nada, row);
+ }
+ OrcStruct row2 = new OrcStruct(type);
+ writer.write(nada, row2);
+ row.setFieldValue(8, new IntWritable(210));
+ writer.write(nada, row);
+ writer.close(Reporter.NULL);
+
+ FileSplit split = new FileSplit(new Path(workDir, "all.orc"), 0, 100000,
+ new String[0]);
+ RecordReader<NullWritable, OrcStruct> reader =
+ new OrcInputFormat<OrcStruct>().getRecordReader(split, conf,
+ Reporter.NULL);
+ nada = reader.createKey();
+ row = reader.createValue();
+ for(int r=0; r < 22; ++r) {
+ assertEquals(true, reader.next(nada, row));
+ if (r == 20) {
+ for(int c=0; c < 12; ++c) {
+ assertEquals(null, row.getFieldValue(c));
+ }
+ } else {
+ assertEquals(new BytesWritable(new byte[]{1, 2, 3, 4}), row.getFieldValue(0));
+ assertEquals(new BooleanWritable(true), row.getFieldValue(1));
+ assertEquals(new ByteWritable((byte) 23), row.getFieldValue(2));
+ assertEquals(new Text("aaabbbcccd"), row.getFieldValue(3));
+ assertEquals(new DateWritable(DateWritable.millisToDays
+ (format.parse("2016-04-01").getTime())), row.getFieldValue(4));
+ assertEquals(new HiveDecimalWritable("1.23"), row.getFieldValue(5));
+ assertEquals(new DoubleWritable(1.5), row.getFieldValue(6));
+ assertEquals(new FloatWritable(4.5f), row.getFieldValue(7));
+ assertEquals(new IntWritable(r * 10), row.getFieldValue(8));
+ assertEquals(longList, row.getFieldValue(9));
+ assertEquals(map, row.getFieldValue(10));
+ if (r < 10) {
+ union.set((byte) 1, new Text("abcde"));
+ } else {
+ union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
+ }
+ assertEquals("row " + r, struct, row.getFieldValue(11));
+ assertEquals("row " + r, new OrcTimestamp("1996-12-11 15:00:00"),
+ row.getFieldValue(12));
+ }
+ }
+ assertEquals(false, reader.next(nada, row));
+ }
+
+ /**
+ * Test the case where the top level isn't a struct, but a long.
+ */
+ @Test
+ public void testLongRoot() throws Exception {
+ conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
+ conf.setOutputCommitter(NullOutputCommitter.class);
+ conf.set(OrcConf.COMPRESS.getAttribute(), "SNAPPY");
+ conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
+ conf.setInt(OrcConf.BUFFER_SIZE.getAttribute(), 64 * 1024);
+ conf.set(OrcConf.WRITE_FORMAT.getAttribute(), "0.11");
+ final String typeStr = "bigint";
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+ FileOutputFormat.setOutputPath(conf, workDir);
+ TypeDescription type = TypeDescription.fromString(typeStr);
+ LongWritable value = new LongWritable();
+ NullWritable nada = NullWritable.get();
+ RecordWriter<NullWritable, LongWritable> writer =
+ new OrcOutputFormat<LongWritable>().getRecordWriter(fs, conf,
+ "long.orc", Reporter.NULL);
+ for(long lo=0; lo < 2000; ++lo) {
+ value.set(lo);
+ writer.write(nada, value);
+ }
+ writer.close(Reporter.NULL);
+
+ Path path = new Path(workDir, "long.orc");
+ Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ assertEquals(CompressionKind.SNAPPY, file.getCompressionKind());
+ assertEquals(2000, file.getNumberOfRows());
+ assertEquals(1000, file.getRowIndexStride());
+ assertEquals(64 * 1024, file.getCompressionSize());
+ assertEquals(OrcFile.Version.V_0_11, file.getFileVersion());
+ FileSplit split = new FileSplit(path, 0, 100000,
+ new String[0]);
+ RecordReader<NullWritable, LongWritable> reader =
+ new OrcInputFormat<LongWritable>().getRecordReader(split, conf,
+ Reporter.NULL);
+ nada = reader.createKey();
+ value = reader.createValue();
+ for(long lo=0; lo < 2000; ++lo) {
+ assertEquals(true, reader.next(nada, value));
+ assertEquals(lo, value.get());
+ }
+ assertEquals(false, reader.next(nada, value));
+ }
+
+ /**
+ * Make sure that the writer ignores the OrcKey
+ * @throws Exception
+ */
+ @Test
+ public void testOrcKey() throws Exception {
+ conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+ conf.set("mapreduce.task.attempt.id", "attempt_jt0_0_m_0_0");
+ String TYPE_STRING = "struct<i:int,s:string>";
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+ conf.setOutputCommitter(NullOutputCommitter.class);
+ TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+ OrcKey key = new OrcKey(new OrcStruct(schema));
+ RecordWriter<NullWritable, Writable> writer =
+ new OrcOutputFormat<>().getRecordWriter(fs, conf, "key.orc",
+ Reporter.NULL);
+ NullWritable nada = NullWritable.get();
+ for(int r=0; r < 2000; ++r) {
+ ((OrcStruct) key.key).setAllFields(new IntWritable(r),
+ new Text(Integer.toString(r)));
+ writer.write(nada, key);
+ }
+ writer.close(Reporter.NULL);
+ Path path = new Path(workDir, "key.orc");
+ Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ assertEquals(2000, file.getNumberOfRows());
+ assertEquals(TYPE_STRING, file.getSchema().toString());
+ }
+
+ /**
+ * Make sure that the writer ignores the OrcValue
+ * @throws Exception
+ */
+ @Test
+ public void testOrcValue() throws Exception {
+ conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+ conf.set("mapreduce.task.attempt.id", "attempt_jt0_0_m_0_0");
+ String TYPE_STRING = "struct<i:int>";
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+ conf.setOutputCommitter(NullOutputCommitter.class);
+ TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+ OrcValue value = new OrcValue(new OrcStruct(schema));
+ RecordWriter<NullWritable, Writable> writer =
+ new OrcOutputFormat<>().getRecordWriter(fs, conf, "value.orc",
+ Reporter.NULL);
+ NullWritable nada = NullWritable.get();
+ for(int r=0; r < 3000; ++r) {
+ ((OrcStruct) value.value).setAllFields(new IntWritable(r));
+ writer.write(nada, value);
+ }
+ writer.close(Reporter.NULL);
+ Path path = new Path(workDir, "value.orc");
+ Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ assertEquals(3000, file.getNumberOfRows());
+ assertEquals(TYPE_STRING, file.getSchema().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
index d32ce94..82699ed 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
@@ -76,6 +76,11 @@ public class TestOrcStruct {
assertEquals(new IntWritable(42), struct.getFieldValue("i"));
assertEquals(new DoubleWritable(1.5), struct.getFieldValue(1));
assertEquals(new Text("Moria"), struct.getFieldValue("k"));
+ struct.setAllFields(new IntWritable(123), new DoubleWritable(4.5),
+ new Text("ok"));
+ assertEquals("123", struct.getFieldValue(0).toString());
+ assertEquals("4.5", struct.getFieldValue(1).toString());
+ assertEquals("ok", struct.getFieldValue(2).toString());
}
@Test
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java
deleted file mode 100644
index ce5523f..0000000
--- a/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.mapred.other;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.mapred.OrcInputFormat;
-import org.apache.orc.mapred.OrcList;
-import org.apache.orc.mapred.OrcMap;
-import org.apache.orc.mapred.OrcOutputFormat;
-import org.apache.orc.mapred.OrcStruct;
-import org.apache.orc.mapred.OrcTimestamp;
-import org.apache.orc.mapred.OrcUnion;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestOrcOutputFormat {
-
- Path workDir = new Path(System.getProperty("test.tmp.dir",
- "target" + File.separator + "test" + File.separator + "tmp"));
- JobConf conf = new JobConf();
- FileSystem fs;
-
- {
- try {
- fs = FileSystem.getLocal(conf).getRaw();
- fs.delete(workDir, true);
- fs.mkdirs(workDir);
- } catch (IOException e) {
- throw new IllegalStateException("bad fs init", e);
- }
- }
-
- static class NullOutputCommitter extends OutputCommitter {
-
- @Override
- public void setupJob(JobContext jobContext) {
- // PASS
- }
-
- @Override
- public void setupTask(TaskAttemptContext taskAttemptContext) {
-
- }
-
- @Override
- public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
- return false;
- }
-
- @Override
- public void commitTask(TaskAttemptContext taskAttemptContext) {
- // PASS
- }
-
- @Override
- public void abortTask(TaskAttemptContext taskAttemptContext) {
- // PASS
- }
- }
-
- @Test
- public void testAllTypes() throws Exception {
- conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
- conf.setOutputCommitter(NullOutputCommitter.class);
- final String typeStr = "struct<b1:binary,b2:boolean,b3:tinyint," +
- "c:char(10),d1:date,d2:decimal(20,5),d3:double,fff:float,int:int," +
- "l:array<bigint>,map:map<smallint,string>," +
- "str:struct<u:uniontype<timestamp,varchar(100)>>,ts:timestamp>";
- conf.set(OrcConf.SCHEMA.getAttribute(), typeStr);
- FileOutputFormat.setOutputPath(conf, workDir);
- TypeDescription type = TypeDescription.fromString(typeStr);
-
- // build a row object
- OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
- ((BytesWritable) row.getFieldValue(0)).set(new byte[]{1,2,3,4}, 0, 4);
- ((BooleanWritable) row.getFieldValue(1)).set(true);
- ((ByteWritable) row.getFieldValue(2)).set((byte) 23);
- ((Text) row.getFieldValue(3)).set("aaabbbcccddd");
- SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
- ((DateWritable) row.getFieldValue(4)).set(DateWritable.millisToDays
- (format.parse("2016-04-01").getTime()));
- ((HiveDecimalWritable) row.getFieldValue(5)).set(new HiveDecimalWritable("1.23"));
- ((DoubleWritable) row.getFieldValue(6)).set(1.5);
- ((FloatWritable) row.getFieldValue(7)).set(4.5f);
- ((IntWritable) row.getFieldValue(8)).set(31415);
- OrcList<LongWritable> longList = (OrcList<LongWritable>) row.getFieldValue(9);
- longList.add(new LongWritable(123));
- longList.add(new LongWritable(456));
- OrcMap<ShortWritable,Text> map = (OrcMap<ShortWritable,Text>) row.getFieldValue(10);
- map.put(new ShortWritable((short) 1000), new Text("aaaa"));
- map.put(new ShortWritable((short) 123), new Text("bbbb"));
- OrcStruct struct = (OrcStruct) row.getFieldValue(11);
- OrcUnion union = (OrcUnion) struct.getFieldValue(0);
- union.set((byte) 1, new Text("abcde"));
- ((OrcTimestamp) row.getFieldValue(12)).set("1996-12-11 15:00:00");
- NullWritable nada = NullWritable.get();
- RecordWriter<NullWritable, OrcStruct> writer =
- new OrcOutputFormat<OrcStruct>().getRecordWriter(fs, conf, "all.orc",
- Reporter.NULL);
- for(int r=0; r < 10; ++r) {
- row.setFieldValue(8, new IntWritable(r * 10));
- writer.write(nada, row);
- }
- union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
- for(int r=0; r < 10; ++r) {
- row.setFieldValue(8, new IntWritable(r * 10 + 100));
- writer.write(nada, row);
- }
- OrcStruct row2 = new OrcStruct(type);
- writer.write(nada, row2);
- row.setFieldValue(8, new IntWritable(210));
- writer.write(nada, row);
- writer.close(Reporter.NULL);
-
- FileSplit split = new FileSplit(new Path(workDir, "all.orc"), 0, 100000,
- new String[0]);
- RecordReader<NullWritable, OrcStruct> reader =
- new OrcInputFormat<OrcStruct>().getRecordReader(split, conf,
- Reporter.NULL);
- nada = reader.createKey();
- row = reader.createValue();
- for(int r=0; r < 22; ++r) {
- assertEquals(true, reader.next(nada, row));
- if (r == 20) {
- for(int c=0; c < 12; ++c) {
- assertEquals(null, row.getFieldValue(c));
- }
- } else {
- assertEquals(new BytesWritable(new byte[]{1, 2, 3, 4}), row.getFieldValue(0));
- assertEquals(new BooleanWritable(true), row.getFieldValue(1));
- assertEquals(new ByteWritable((byte) 23), row.getFieldValue(2));
- assertEquals(new Text("aaabbbcccd"), row.getFieldValue(3));
- assertEquals(new DateWritable(DateWritable.millisToDays
- (format.parse("2016-04-01").getTime())), row.getFieldValue(4));
- assertEquals(new HiveDecimalWritable("1.23"), row.getFieldValue(5));
- assertEquals(new DoubleWritable(1.5), row.getFieldValue(6));
- assertEquals(new FloatWritable(4.5f), row.getFieldValue(7));
- assertEquals(new IntWritable(r * 10), row.getFieldValue(8));
- assertEquals(longList, row.getFieldValue(9));
- assertEquals(map, row.getFieldValue(10));
- if (r < 10) {
- union.set((byte) 1, new Text("abcde"));
- } else {
- union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
- }
- assertEquals("row " + r, struct, row.getFieldValue(11));
- assertEquals("row " + r, new OrcTimestamp("1996-12-11 15:00:00"),
- row.getFieldValue(12));
- }
- }
- assertEquals(false, reader.next(nada, row));
- }
-
- /**
- * Test the case where the top level isn't a struct, but a long.
- */
- @Test
- public void testLongRoot() throws Exception {
- conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
- conf.setOutputCommitter(NullOutputCommitter.class);
- conf.set(OrcConf.COMPRESS.getAttribute(), "SNAPPY");
- conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
- conf.setInt(OrcConf.BUFFER_SIZE.getAttribute(), 64 * 1024);
- conf.set(OrcConf.WRITE_FORMAT.getAttribute(), "0.11");
- final String typeStr = "bigint";
- conf.set(OrcConf.SCHEMA.getAttribute(), typeStr);
- FileOutputFormat.setOutputPath(conf, workDir);
- TypeDescription type = TypeDescription.fromString(typeStr);
- LongWritable value = new LongWritable();
- NullWritable nada = NullWritable.get();
- RecordWriter<NullWritable, LongWritable> writer =
- new OrcOutputFormat<LongWritable>().getRecordWriter(fs, conf,
- "long.orc", Reporter.NULL);
- for(long lo=0; lo < 2000; ++lo) {
- value.set(lo);
- writer.write(nada, value);
- }
- writer.close(Reporter.NULL);
-
- Path path = new Path(workDir, "long.orc");
- Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
- assertEquals(CompressionKind.SNAPPY, file.getCompressionKind());
- assertEquals(2000, file.getNumberOfRows());
- assertEquals(1000, file.getRowIndexStride());
- assertEquals(64 * 1024, file.getCompressionSize());
- assertEquals(OrcFile.Version.V_0_11, file.getFileVersion());
- FileSplit split = new FileSplit(path, 0, 100000,
- new String[0]);
- RecordReader<NullWritable, LongWritable> reader =
- new OrcInputFormat<LongWritable>().getRecordReader(split, conf,
- Reporter.NULL);
- nada = reader.createKey();
- value = reader.createValue();
- for(long lo=0; lo < 2000; ++lo) {
- assertEquals(true, reader.next(nada, value));
- assertEquals(lo, value.get());
- }
- assertEquals(false, reader.next(nada, value));
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
new file mode 100644
index 0000000..27543c1
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestMapreduceOrcOutputFormat {
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ JobConf conf = new JobConf();
+ FileSystem fs;
+
+ {
+ try {
+ fs = FileSystem.getLocal(conf).getRaw();
+ fs.delete(workDir, true);
+ fs.mkdirs(workDir);
+ } catch (IOException e) {
+ throw new IllegalStateException("bad fs init", e);
+ }
+ }
+
+ @Test
+ public void testPredicatePushdown() throws Exception {
+ TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+ TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+ final String typeStr = "struct<i:int,s:string>";
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+ conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+ conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
+ conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+ OutputFormat<NullWritable, OrcStruct> outputFormat =
+ new OrcOutputFormat<OrcStruct>();
+ RecordWriter<NullWritable, OrcStruct> writer =
+ outputFormat.getRecordWriter(attemptContext);
+
+ // write 4000 rows with the integer and the binary string
+ TypeDescription type = TypeDescription.fromString(typeStr);
+ OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
+ NullWritable nada = NullWritable.get();
+ for(int r=0; r < 4000; ++r) {
+ row.setFieldValue(0, new IntWritable(r));
+ row.setFieldValue(1, new Text(Integer.toBinaryString(r)));
+ writer.write(nada, row);
+ }
+ writer.close(attemptContext);
+
+ OrcInputFormat.setSearchArgument(conf,
+ SearchArgumentFactory.newBuilder()
+ .between("i", PredicateLeaf.Type.LONG, new Long(1500), new Long(1999))
+ .build(), new String[]{null, "i", "s"});
+ FileSplit split = new FileSplit(new Path(workDir, "part-m-00000.orc"),
+ 0, 1000000, new String[0]);
+ RecordReader<NullWritable, OrcStruct> reader =
+ new OrcInputFormat<OrcStruct>().createRecordReader(split,
+ attemptContext);
+ // the sarg should cause it to skip over the rows except 1000 to 2000
+ for(int r=1000; r < 2000; ++r) {
+ assertEquals(true, reader.nextKeyValue());
+ row = reader.getCurrentValue();
+ assertEquals(r, ((IntWritable) row.getFieldValue(0)).get());
+ assertEquals(Integer.toBinaryString(r), row.getFieldValue(1).toString());
+ }
+ assertEquals(false, reader.nextKeyValue());
+ }
+
+ @Test
+ public void testColumnSelection() throws Exception {
+ String typeStr = "struct<i:int,j:int,k:int>";
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+ conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+ conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
+ conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+ TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1);
+ TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+ OutputFormat<NullWritable, OrcStruct> outputFormat =
+ new OrcOutputFormat<OrcStruct>();
+ RecordWriter<NullWritable, OrcStruct> writer =
+ outputFormat.getRecordWriter(attemptContext);
+
+ // write 4000 rows with the integer and the binary string
+ TypeDescription type = TypeDescription.fromString(typeStr);
+ OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
+ NullWritable nada = NullWritable.get();
+ for(int r=0; r < 3000; ++r) {
+ row.setFieldValue(0, new IntWritable(r));
+ row.setFieldValue(1, new IntWritable(r * 2));
+ row.setFieldValue(2, new IntWritable(r * 3));
+ writer.write(nada, row);
+ }
+ writer.close(attemptContext);
+
+ conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute(), "0,2");
+ FileSplit split = new FileSplit(new Path(workDir, "part-m-00000.orc"),
+ 0, 1000000, new String[0]);
+ RecordReader<NullWritable, OrcStruct> reader =
+ new OrcInputFormat<OrcStruct>().createRecordReader(split,
+ attemptContext);
+ // the sarg should cause it to skip over the rows except 1000 to 2000
+ for(int r=0; r < 3000; ++r) {
+ assertEquals(true, reader.nextKeyValue());
+ row = reader.getCurrentValue();
+ assertEquals(r, ((IntWritable) row.getFieldValue(0)).get());
+ assertEquals(null, row.getFieldValue(1));
+ assertEquals(r * 3, ((IntWritable) row.getFieldValue(2)).get());
+ }
+ assertEquals(false, reader.nextKeyValue());
+ }
+
+
+ /**
+ * Make sure that the writer ignores the OrcKey
+ * @throws Exception
+ */
+ @Test
+ public void testOrcKey() throws Exception {
+ conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+ String TYPE_STRING = "struct<i:int,s:string>";
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+ conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+ TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1);
+ TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+ TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+ OrcKey key = new OrcKey(new OrcStruct(schema));
+ RecordWriter<NullWritable, Writable> writer =
+ new OrcOutputFormat<>().getRecordWriter(attemptContext);
+ NullWritable nada = NullWritable.get();
+ for(int r=0; r < 2000; ++r) {
+ ((OrcStruct) key.key).setAllFields(new IntWritable(r),
+ new Text(Integer.toString(r)));
+ writer.write(nada, key);
+ }
+ writer.close(attemptContext);
+ Path path = new Path(workDir, "part-m-00000.orc");
+ Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ assertEquals(2000, file.getNumberOfRows());
+ assertEquals(TYPE_STRING, file.getSchema().toString());
+ }
+
+ /**
+ * Make sure that the writer ignores the OrcValue
+ * @throws Exception
+ */
+ @Test
+ public void testOrcValue() throws Exception {
+ conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+ String TYPE_STRING = "struct<i:int>";
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+ conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+ TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1);
+ TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+
+ TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+ OrcValue value = new OrcValue(new OrcStruct(schema));
+ RecordWriter<NullWritable, Writable> writer =
+ new OrcOutputFormat<>().getRecordWriter(attemptContext);
+ NullWritable nada = NullWritable.get();
+ for(int r=0; r < 3000; ++r) {
+ ((OrcStruct) value.value).setAllFields(new IntWritable(r));
+ writer.write(nada, value);
+ }
+ writer.close(attemptContext);
+ Path path = new Path(workDir, "part-m-00000.orc");
+ Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ assertEquals(3000, file.getNumberOfRows());
+ assertEquals(TYPE_STRING, file.getSchema().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
new file mode 100644
index 0000000..01208e1
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+public class TestMrUnit {
+ JobConf conf = new JobConf();
+
+ /**
+ * Split the input struct into its two parts.
+ */
+ public static class MyMapper
+ extends Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> {
+ private OrcKey keyWrapper = new OrcKey();
+ private OrcValue valueWrapper = new OrcValue();
+
+ @Override
+ protected void map(NullWritable key,
+ OrcStruct value,
+ Context context
+ ) throws IOException, InterruptedException {
+ keyWrapper.key = value.getFieldValue(0);
+ valueWrapper.value = value.getFieldValue(1);
+ context.write(keyWrapper, valueWrapper);
+ }
+ }
+
+ /**
+ * Glue the key and values back together.
+ */
+ public static class MyReducer
+ extends Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> {
+ private OrcStruct output = new OrcStruct(TypeDescription.fromString
+ ("struct<first:struct<x:int,y:int>,second:struct<z:string>>"));
+ private final NullWritable nada = NullWritable.get();
+
+ @Override
+ protected void reduce(OrcKey key,
+ Iterable<OrcValue> values,
+ Context context
+ ) throws IOException, InterruptedException {
+ output.setFieldValue(0, key.key);
+ for(OrcValue value: values) {
+ output.setFieldValue(1, value.value);
+ context.write(nada, output);
+ }
+ }
+ }
+
+ /**
+ * This class is intended to support MRUnit's object copying for input and
+ * output objects.
+ *
+ * Real mapreduce contexts should NEVER use this class.
+ *
+ * The type string is serialized before each value.
+ */
+ public static class OrcStructSerialization
+ implements Serialization<OrcStruct> {
+
+ @Override
+ public boolean accept(Class<?> cls) {
+ return OrcStruct.class.isAssignableFrom(cls);
+ }
+
+ @Override
+ public Serializer<OrcStruct> getSerializer(Class<OrcStruct> aClass) {
+ return new Serializer<OrcStruct>() {
+ DataOutputStream dataOut;
+
+ public void open(OutputStream out) {
+ if(out instanceof DataOutputStream) {
+ dataOut = (DataOutputStream)out;
+ } else {
+ dataOut = new DataOutputStream(out);
+ }
+ }
+
+ public void serialize(OrcStruct w) throws IOException {
+ Text.writeString(dataOut, w.getSchema().toString());
+ w.write(dataOut);
+ }
+
+ public void close() throws IOException {
+ dataOut.close();
+ }
+ };
+ }
+
+ @Override
+ public Deserializer<OrcStruct> getDeserializer(Class<OrcStruct> aClass) {
+ return new Deserializer<OrcStruct>() {
+ DataInputStream input;
+
+ @Override
+ public void open(InputStream inputStream) throws IOException {
+ if(inputStream instanceof DataInputStream) {
+ input = (DataInputStream)inputStream;
+ } else {
+ input = new DataInputStream(inputStream);
+ }
+ }
+
+ @Override
+ public OrcStruct deserialize(OrcStruct orcStruct) throws IOException {
+ String typeStr = Text.readString(input);
+ OrcStruct result = new OrcStruct(TypeDescription.fromString(typeStr));
+ result.readFields(input);
+ return result;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // PASS
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testMapred() throws IOException {
+ conf.set("io.serializations",
+ OrcStructSerialization.class.getName() + "," +
+ WritableSerialization.class.getName());
+ OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.setString(conf, "struct<x:int,y:int>");
+ OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(conf, "struct<z:string>");
+ MyMapper mapper = new MyMapper();
+ MyReducer reducer = new MyReducer();
+ MapReduceDriver<NullWritable, OrcStruct,
+ OrcKey, OrcValue,
+ NullWritable, OrcStruct> driver =
+ new MapReduceDriver<>(mapper, reducer);
+ driver.setConfiguration(conf);
+ NullWritable nada = NullWritable.get();
+ OrcStruct input = (OrcStruct) OrcStruct.createValue(
+ TypeDescription.fromString("struct<one:struct<x:int,y:int>,two:struct<z:string>>"));
+ IntWritable x =
+ (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(0);
+ IntWritable y =
+ (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(1);
+ Text z = (Text) ((OrcStruct) input.getFieldValue(1)).getFieldValue(0);
+
+ // generate the input stream
+ for(int r=0; r < 20; ++r) {
+ x.set(100 - (r / 4));
+ y.set(r*2);
+ z.set(Integer.toHexString(r));
+ driver.withInput(nada, input);
+ }
+
+ // generate the expected outputs
+ for(int g=4; g >= 0; --g) {
+ x.set(100 - g);
+ for(int i=0; i < 4; ++i) {
+ int r = g * 4 + i;
+ y.set(r * 2);
+ z.set(Integer.toHexString(r));
+ driver.withOutput(nada, input);
+ }
+ }
+ driver.runTest();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 9941dee..2eacd7a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -47,6 +47,7 @@
<junit.version>4.11</junit.version>
<kryo.version>3.0.3</kryo.version>
<mockito.version>1.9.5</mockito.version>
+ <mrunit.version>1.1.0</mrunit.version>
<protobuf.version>2.5.0</protobuf.version>
<slf4j.version>1.7.5</slf4j.version>
<snappy.version>0.2</snappy.version>
[2/2] orc git commit: ORC-52 Add support for
org.apache.hadoop.mapreduce InputFormat and OutputFormat. (omalley)
Posted by om...@apache.org.
ORC-52 Add support for org.apache.hadoop.mapreduce InputFormat and
OutputFormat. (omalley)
Fixes #27
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/3bb5ce53
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/3bb5ce53
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/3bb5ce53
Branch: refs/heads/master
Commit: 3bb5ce532180fcaa03fa6d13d5829a18a153ad6e
Parents: 545fe37
Author: Owen O'Malley <om...@apache.org>
Authored: Tue May 24 14:28:36 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Wed Jun 1 20:56:28 2016 -0700
----------------------------------------------------------------------
java/core/src/java/org/apache/orc/OrcConf.java | 31 +-
java/mapreduce/pom.xml | 7 +
.../org/apache/orc/mapred/OrcInputFormat.java | 62 ++-
.../src/java/org/apache/orc/mapred/OrcKey.java | 90 +++
.../src/java/org/apache/orc/mapred/OrcMap.java | 1 -
.../orc/mapred/OrcMapredRecordReader.java | 551 +++++++++++++++++++
.../orc/mapred/OrcMapredRecordWriter.java | 283 ++++++++++
.../org/apache/orc/mapred/OrcOutputFormat.java | 45 +-
.../org/apache/orc/mapred/OrcRecordReader.java | 547 ------------------
.../org/apache/orc/mapred/OrcRecordWriter.java | 277 ----------
.../java/org/apache/orc/mapred/OrcStruct.java | 8 +
.../org/apache/orc/mapred/OrcTimestamp.java | 2 +-
.../java/org/apache/orc/mapred/OrcValue.java | 69 +++
.../apache/orc/mapreduce/OrcInputFormat.java | 71 +++
.../orc/mapreduce/OrcMapreduceRecordReader.java | 119 ++++
.../orc/mapreduce/OrcMapreduceRecordWriter.java | 83 +++
.../apache/orc/mapreduce/OrcOutputFormat.java | 70 +++
.../test/org/apache/orc/mapred/TestMrUnit.java | 223 ++++++++
.../apache/orc/mapred/TestOrcOutputFormat.java | 299 ++++++++++
.../org/apache/orc/mapred/TestOrcStruct.java | 5 +
.../orc/mapred/other/TestOrcOutputFormat.java | 249 ---------
.../mapreduce/TestMapreduceOrcOutputFormat.java | 214 +++++++
.../org/apache/orc/mapreduce/TestMrUnit.java | 203 +++++++
java/pom.xml | 1 +
24 files changed, 2393 insertions(+), 1117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/core/src/java/org/apache/orc/OrcConf.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java
index df1b410..a575e18 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -101,9 +101,18 @@ public enum OrcConf {
"The maximum size of the file to read for finding the file tail. This\n" +
"is primarily used for streaming ingest to read intermediate\n" +
"footers while the file is still open"),
- SCHEMA("orc.schema", "orc.schema", null,
- "The schema that the user desires to read or write. The values are\n" +
+ MAPRED_INPUT_SCHEMA("orc.mapred.input.schema", null, null,
+ "The schema that the user desires to read. The values are\n" +
"interpreted using TypeDescription.fromString."),
+ MAPRED_SHUFFLE_KEY_SCHEMA("orc.mapred.map.output.key.schema", null, null,
+ "The schema of the MapReduce shuffle key. The values are\n" +
+ "interpreted using TypeDescription.fromString."),
+ MAPRED_SHUFFLE_VALUE_SCHEMA("orc.mapred.map.output.value.schema", null, null,
+ "The schema of the MapReduce shuffle value. The values are\n" +
+ "interpreted using TypeDescription.fromString."),
+ MAPRED_OUTPUT_SCHEMA("orc.mapred.output.schema", null, null,
+ "The schema that the user desires to write. The values are\n" +
+ "interpreted using TypeDescription.fromString."),
INCLUDE_COLUMNS("orc.include.columns", "hive.io.file.readcolumn.ids", null,
"The list of comma separated column ids that should be read with 0\n" +
"being the first column, 1 being the next, and so on. ."),
@@ -151,7 +160,7 @@ public enum OrcConf {
}
if (result == null && conf != null) {
result = conf.get(attribute);
- if (result == null) {
+ if (result == null && hiveConfName != null) {
result = conf.get(hiveConfName);
}
}
@@ -170,6 +179,10 @@ public enum OrcConf {
return getLong(null, conf);
}
+ public void setLong(Configuration conf, long value) {
+ conf.setLong(attribute, value);
+ }
+
public String getString(Properties tbl, Configuration conf) {
String value = lookupValue(tbl, conf);
return value == null ? (String) defaultValue : value;
@@ -179,6 +192,10 @@ public enum OrcConf {
return getString(null, conf);
}
+ public void setString(Configuration conf, String value) {
+ conf.set(attribute, value);
+ }
+
public boolean getBoolean(Properties tbl, Configuration conf) {
String value = lookupValue(tbl, conf);
if (value != null) {
@@ -191,6 +208,10 @@ public enum OrcConf {
return getBoolean(null, conf);
}
+ public void setBoolean(Configuration conf, boolean value) {
+ conf.setBoolean(attribute, value);
+ }
+
public double getDouble(Properties tbl, Configuration conf) {
String value = lookupValue(tbl, conf);
if (value != null) {
@@ -202,4 +223,8 @@ public enum OrcConf {
public double getDouble(Configuration conf) {
return getDouble(null, conf);
}
+
+ public void setDouble(Configuration conf, double value) {
+ conf.setDouble(attribute, value);
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/java/mapreduce/pom.xml b/java/mapreduce/pom.xml
index 3b38a40..0b48c82 100644
--- a/java/mapreduce/pom.xml
+++ b/java/mapreduce/pom.xml
@@ -118,6 +118,13 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.mrunit</groupId>
+ <artifactId>mrunit</artifactId>
+ <version>${mrunit.version}</version>
+ <scope>test</scope>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
index 78e75f7..ac8ca61 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
@@ -25,7 +25,7 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
@@ -46,7 +46,7 @@ import org.apache.orc.TypeDescription;
/**
* A MapReduce/Hive input format for ORC files.
*/
-public class OrcInputFormat<V extends Writable>
+public class OrcInputFormat<V extends WritableComparable>
extends FileInputFormat<NullWritable, V> {
/**
@@ -56,7 +56,8 @@ public class OrcInputFormat<V extends Writable>
* @param columnsStr the comma separated list of column ids
* @return a boolean array
*/
- static boolean[] parseInclude(TypeDescription schema, String columnsStr) {
+ public static boolean[] parseInclude(TypeDescription schema,
+ String columnsStr) {
if (columnsStr == null ||
schema.getCategory() != TypeDescription.Category.STRUCT) {
return null;
@@ -82,39 +83,41 @@ public class OrcInputFormat<V extends Writable>
public static void setSearchArgument(Configuration conf,
SearchArgument sarg,
String[] columnNames) {
- Output out = new Output();
+ Output out = new Output(100000);
new Kryo().writeObject(out, sarg);
- conf.set(OrcConf.KRYO_SARG.getAttribute(),
- Base64.encodeBase64String(out.toBytes()));
+ OrcConf.KRYO_SARG.setString(conf, Base64.encodeBase64String(out.toBytes()));
StringBuilder buffer = new StringBuilder();
- for(int i=0; i < columnNames.length; ++i) {
+ for (int i = 0; i < columnNames.length; ++i) {
if (i != 0) {
buffer.append(',');
}
buffer.append(columnNames[i]);
}
- conf.set(OrcConf.SARG_COLUMNS.getAttribute(), buffer.toString());
+ OrcConf.SARG_COLUMNS.setString(conf, buffer.toString());
}
- @Override
- public RecordReader<NullWritable, V>
- getRecordReader(InputSplit inputSplit,
- JobConf conf,
- Reporter reporter) throws IOException {
- FileSplit split = (FileSplit) inputSplit;
- Reader file = OrcFile.createReader(split.getPath(),
- OrcFile.readerOptions(conf)
- .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
+ /**
+ * Build the Reader.Options object based on the JobConf and the range of
+ * bytes.
+ * @param conf the job configuratoin
+ * @param start the byte offset to start reader
+ * @param length the number of bytes to read
+ * @return the options to read with
+ */
+ public static Reader.Options buildOptions(Configuration conf,
+ Reader reader,
+ long start,
+ long length) {
TypeDescription schema =
- TypeDescription.fromString(OrcConf.SCHEMA.getString(conf));
+ TypeDescription.fromString(OrcConf.MAPRED_INPUT_SCHEMA.getString(conf));
Reader.Options options = new Reader.Options()
- .range(split.getStart(), split.getLength())
+ .range(start, length)
.useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf));
- if (schema == null) {
- schema = file.getSchema();
- } else {
+ if (schema != null) {
options.schema(schema);
+ } else {
+ schema = reader.getSchema();
}
options.include(parseInclude(schema,
OrcConf.INCLUDE_COLUMNS.getString(conf)));
@@ -126,6 +129,19 @@ public class OrcInputFormat<V extends Writable>
new Kryo().readObject(new Input(sargBytes), SearchArgumentImpl.class);
options.searchArgument(sarg, sargColumns.split(","));
}
- return new OrcRecordReader(file, options);
+ return options;
+ }
+
+ @Override
+ public RecordReader<NullWritable, V>
+ getRecordReader(InputSplit inputSplit,
+ JobConf conf,
+ Reporter reporter) throws IOException {
+ FileSplit split = (FileSplit) inputSplit;
+ Reader file = OrcFile.createReader(split.getPath(),
+ OrcFile.readerOptions(conf)
+ .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
+ return new OrcMapredRecordReader<>(file, buildOptions(conf,
+ file, split.getStart(), split.getLength()));
}
}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java
new file mode 100644
index 0000000..ad94e32
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * This type provides a wrapper for OrcStruct so that it can be sent through
+ * the MapReduce shuffle as a key.
+ *
+ * The user should set the JobConf with orc.mapred.key.type with the type
+ * string of the type.
+ */
+public final class OrcKey
+ implements WritableComparable<OrcKey>, JobConfigurable {
+
+ public WritableComparable key;
+
+ public OrcKey(WritableComparable key) {
+ this.key = key;
+ }
+
+ public OrcKey() {
+ key = null;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ key.write(dataOutput);
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ key.readFields(dataInput);
+ }
+
+ @Override
+ public void configure(JobConf conf) {
+ if (key == null) {
+ TypeDescription schema =
+ TypeDescription.fromString(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA
+ .getString(conf));
+ key = OrcStruct.createValue(schema);
+ }
+ }
+
+ @Override
+ public int compareTo(OrcKey o) {
+ return key.compareTo(o.key);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || key == null) {
+ return false;
+ } else if (o.getClass() != getClass()) {
+ return false;
+ } else {
+ return key.equals(((OrcKey) o).key);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return key.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
index cf47827..38c152c 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
@@ -45,7 +45,6 @@ public final class OrcMap<K extends WritableComparable,
@Override
public void write(DataOutput output) throws IOException {
- Iterator<Map.Entry<K,V>> itr = entrySet().iterator();
output.writeInt(size());
for(Map.Entry<K,V> entry: entrySet()) {
K key = entry.getKey();
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
new file mode 100644
index 0000000..ddbc396
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+/**
+ * This record reader implements the org.apache.hadoop.mapred API.
+ * @param <V> the root type of the file
+ */
+public class OrcMapredRecordReader<V extends WritableComparable>
+ implements org.apache.hadoop.mapred.RecordReader<NullWritable, V> {
+ private final TypeDescription schema;
+ private final RecordReader batchReader;
+ private final VectorizedRowBatch batch;
+ private int rowInBatch;
+
+ protected OrcMapredRecordReader(Reader fileReader,
+ Reader.Options options) throws IOException {
+ this.batchReader = fileReader.rows(options);
+ if (options.getSchema() == null) {
+ schema = fileReader.getSchema();
+ } else {
+ schema = options.getSchema();
+ }
+ this.batch = schema.createRowBatch();
+ rowInBatch = 0;
+ }
+
+ /**
+ * If the current batch is empty, get a new one.
+ * @return true if we have rows available.
+ * @throws IOException
+ */
+ boolean ensureBatch() throws IOException {
+ if (rowInBatch >= batch.size) {
+ rowInBatch = 0;
+ return batchReader.nextBatch(batch);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean next(NullWritable key, V value) throws IOException {
+ if (!ensureBatch()) {
+ return false;
+ }
+ if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+ OrcStruct result = (OrcStruct) value;
+ List<TypeDescription> children = schema.getChildren();
+ int numberOfChildren = children.size();
+ for(int i=0; i < numberOfChildren; ++i) {
+ result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch,
+ children.get(i), result.getFieldValue(i)));
+ }
+ } else {
+ nextValue(batch.cols[0], rowInBatch, schema, value);
+ }
+ rowInBatch += 1;
+ return true;
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public V createValue() {
+ return (V) OrcStruct.createValue(schema);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ batchReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return 0;
+ }
+
+ static BooleanWritable nextBoolean(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ BooleanWritable result;
+ if (previous == null || previous.getClass() != BooleanWritable.class) {
+ result = new BooleanWritable();
+ } else {
+ result = (BooleanWritable) previous;
+ }
+ result.set(((LongColumnVector) vector).vector[row] != 0);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static ByteWritable nextByte(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ ByteWritable result;
+ if (previous == null || previous.getClass() != ByteWritable.class) {
+ result = new ByteWritable();
+ } else {
+ result = (ByteWritable) previous;
+ }
+ result.set((byte) ((LongColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static ShortWritable nextShort(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ ShortWritable result;
+ if (previous == null || previous.getClass() != ShortWritable.class) {
+ result = new ShortWritable();
+ } else {
+ result = (ShortWritable) previous;
+ }
+ result.set((short) ((LongColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static IntWritable nextInt(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ IntWritable result;
+ if (previous == null || previous.getClass() != IntWritable.class) {
+ result = new IntWritable();
+ } else {
+ result = (IntWritable) previous;
+ }
+ result.set((int) ((LongColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static LongWritable nextLong(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ LongWritable result;
+ if (previous == null || previous.getClass() != LongWritable.class) {
+ result = new LongWritable();
+ } else {
+ result = (LongWritable) previous;
+ }
+ result.set(((LongColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static FloatWritable nextFloat(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ FloatWritable result;
+ if (previous == null || previous.getClass() != FloatWritable.class) {
+ result = new FloatWritable();
+ } else {
+ result = (FloatWritable) previous;
+ }
+ result.set((float) ((DoubleColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static DoubleWritable nextDouble(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ DoubleWritable result;
+ if (previous == null || previous.getClass() != DoubleWritable.class) {
+ result = new DoubleWritable();
+ } else {
+ result = (DoubleWritable) previous;
+ }
+ result.set(((DoubleColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static Text nextString(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ Text result;
+ if (previous == null || previous.getClass() != Text.class) {
+ result = new Text();
+ } else {
+ result = (Text) previous;
+ }
+ BytesColumnVector bytes = (BytesColumnVector) vector;
+ result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static BytesWritable nextBinary(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ BytesWritable result;
+ if (previous == null || previous.getClass() != BytesWritable.class) {
+ result = new BytesWritable();
+ } else {
+ result = (BytesWritable) previous;
+ }
+ BytesColumnVector bytes = (BytesColumnVector) vector;
+ result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static HiveDecimalWritable nextDecimal(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ HiveDecimalWritable result;
+ if (previous == null || previous.getClass() != HiveDecimalWritable.class) {
+ result = new HiveDecimalWritable();
+ } else {
+ result = (HiveDecimalWritable) previous;
+ }
+ result.set(((DecimalColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static DateWritable nextDate(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ DateWritable result;
+ if (previous == null || previous.getClass() != DateWritable.class) {
+ result = new DateWritable();
+ } else {
+ result = (DateWritable) previous;
+ }
+ int date = (int) ((LongColumnVector) vector).vector[row];
+ result.set(date);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static OrcTimestamp nextTimestamp(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ OrcTimestamp result;
+ if (previous == null || previous.getClass() != OrcTimestamp.class) {
+ result = new OrcTimestamp();
+ } else {
+ result = (OrcTimestamp) previous;
+ }
+ TimestampColumnVector tcv = (TimestampColumnVector) vector;
+ result.setTime(tcv.time[row]);
+ result.setNanos(tcv.nanos[row]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static OrcStruct nextStruct(ColumnVector vector,
+ int row,
+ TypeDescription schema,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ OrcStruct result;
+ List<TypeDescription> childrenTypes = schema.getChildren();
+ int numChildren = childrenTypes.size();
+ if (previous == null || previous.getClass() != OrcStruct.class) {
+ result = new OrcStruct(schema);
+ } else {
+ result = (OrcStruct) previous;
+ }
+ StructColumnVector struct = (StructColumnVector) vector;
+ for(int f=0; f < numChildren; ++f) {
+ result.setFieldValue(f, nextValue(struct.fields[f], row,
+ childrenTypes.get(f), result.getFieldValue(f)));
+ }
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static OrcUnion nextUnion(ColumnVector vector,
+ int row,
+ TypeDescription schema,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ OrcUnion result;
+ List<TypeDescription> childrenTypes = schema.getChildren();
+ if (previous == null || previous.getClass() != OrcUnion.class) {
+ result = new OrcUnion(schema);
+ } else {
+ result = (OrcUnion) previous;
+ }
+ UnionColumnVector union = (UnionColumnVector) vector;
+ byte tag = (byte) union.tags[row];
+ result.set(tag, nextValue(union.fields[tag], row, childrenTypes.get(tag),
+ result.getObject()));
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static OrcList nextList(ColumnVector vector,
+ int row,
+ TypeDescription schema,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ OrcList result;
+ List<TypeDescription> childrenTypes = schema.getChildren();
+ TypeDescription valueType = childrenTypes.get(0);
+ if (previous == null ||
+ previous.getClass() != ArrayList.class) {
+ result = new OrcList(schema);
+ } else {
+ result = (OrcList) previous;
+ }
+ ListColumnVector list = (ListColumnVector) vector;
+ int length = (int) list.lengths[row];
+ int offset = (int) list.offsets[row];
+ result.ensureCapacity(length);
+ int oldLength = result.size();
+ int idx = 0;
+ while (idx < length && idx < oldLength) {
+ result.set(idx, nextValue(list.child, offset + idx, valueType,
+ result.get(idx)));
+ idx += 1;
+ }
+ if (length < oldLength) {
+ for(int i= oldLength - 1; i >= length; --i) {
+ result.remove(i);
+ }
+ } else if (oldLength < length) {
+ while (idx < length) {
+ result.add(nextValue(list.child, offset + idx, valueType, null));
+ idx += 1;
+ }
+ }
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ static OrcMap nextMap(ColumnVector vector,
+ int row,
+ TypeDescription schema,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ MapColumnVector map = (MapColumnVector) vector;
+ int length = (int) map.lengths[row];
+ int offset = (int) map.offsets[row];
+ OrcMap result;
+ List<TypeDescription> childrenTypes = schema.getChildren();
+ TypeDescription keyType = childrenTypes.get(0);
+ TypeDescription valueType = childrenTypes.get(1);
+ if (previous == null ||
+ previous.getClass() != OrcMap.class) {
+ result = new OrcMap(schema);
+ } else {
+ result = (OrcMap) previous;
+ // I couldn't think of a good way to reuse the keys and value objects
+ // without even more allocations, so take the easy and safe approach.
+ result.clear();
+ }
+ for(int e=0; e < length; ++e) {
+ result.put(nextValue(map.keys, e + offset, keyType, null),
+ nextValue(map.values, e + offset, valueType, null));
+ }
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ public static WritableComparable nextValue(ColumnVector vector,
+ int row,
+ TypeDescription schema,
+ Object previous) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return nextBoolean(vector, row, previous);
+ case BYTE:
+ return nextByte(vector, row, previous);
+ case SHORT:
+ return nextShort(vector, row, previous);
+ case INT:
+ return nextInt(vector, row, previous);
+ case LONG:
+ return nextLong(vector, row, previous);
+ case FLOAT:
+ return nextFloat(vector, row, previous);
+ case DOUBLE:
+ return nextDouble(vector, row, previous);
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return nextString(vector, row, previous);
+ case BINARY:
+ return nextBinary(vector, row, previous);
+ case DECIMAL:
+ return nextDecimal(vector, row, previous);
+ case DATE:
+ return nextDate(vector, row, previous);
+ case TIMESTAMP:
+ return nextTimestamp(vector, row, previous);
+ case STRUCT:
+ return nextStruct(vector, row, schema, previous);
+ case UNION:
+ return nextUnion(vector, row, schema, previous);
+ case LIST:
+ return nextList(vector, row, schema, previous);
+ case MAP:
+ return nextMap(vector, row, schema, previous);
+ default:
+ throw new IllegalArgumentException("Unknown type " + schema);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java
new file mode 100644
index 0000000..59f89f7
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class OrcMapredRecordWriter<V extends Writable>
+ implements RecordWriter<NullWritable, V> {
+ private final Writer writer;
+ private final VectorizedRowBatch batch;
+ private final TypeDescription schema;
+ private final boolean isTopStruct;
+
+ public OrcMapredRecordWriter(Writer writer) {
+ this.writer = writer;
+ schema = writer.getSchema();
+ this.batch = schema.createRowBatch();
+ isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT;
+ }
+
+ static void setLongValue(ColumnVector vector, int row, long value) {
+ ((LongColumnVector) vector).vector[row] = value;
+ }
+
+ static void setDoubleValue(ColumnVector vector, int row, double value) {
+ ((DoubleColumnVector) vector).vector[row] = value;
+ }
+
+ static void setBinaryValue(ColumnVector vector, int row,
+ BinaryComparable value) {
+ ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0,
+ value.getLength());
+ }
+
+ static void setBinaryValue(ColumnVector vector, int row,
+ BinaryComparable value, int maxLength) {
+ ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0,
+ Math.min(maxLength, value.getLength()));
+ }
+
+ private static final ThreadLocal<byte[]> SPACE_BUFFER =
+ new ThreadLocal<byte[]>() {
+ @Override
+ protected byte[] initialValue() {
+ byte[] result = new byte[100];
+ Arrays.fill(result, (byte) ' ');
+ return result;
+ }
+ };
+
+ static void setCharValue(BytesColumnVector vector,
+ int row,
+ Text value,
+ int length) {
+ // we need to trim or pad the string with spaces to required length
+ int actualLength = value.getLength();
+ if (actualLength >= length) {
+ setBinaryValue(vector, row, value, length);
+ } else {
+ byte[] spaces = SPACE_BUFFER.get();
+ if (length - actualLength > spaces.length) {
+ spaces = new byte[length - actualLength];
+ Arrays.fill(spaces, (byte)' ');
+ SPACE_BUFFER.set(spaces);
+ }
+ vector.setConcat(row, value.getBytes(), 0, actualLength, spaces, 0,
+ length - actualLength);
+ }
+ }
+
+ static void setStructValue(TypeDescription schema,
+ StructColumnVector vector,
+ int row,
+ OrcStruct value) {
+ List<TypeDescription> children = schema.getChildren();
+ for(int c=0; c < value.getNumFields(); ++c) {
+ setColumn(children.get(c), vector.fields[c], row, value.getFieldValue(c));
+ }
+ }
+
+ static void setUnionValue(TypeDescription schema,
+ UnionColumnVector vector,
+ int row,
+ OrcUnion value) {
+ List<TypeDescription> children = schema.getChildren();
+ int tag = value.getTag() & 0xff;
+ vector.tags[row] = tag;
+ setColumn(children.get(tag), vector.fields[tag], row, value.getObject());
+ }
+
+
+ static void setListValue(TypeDescription schema,
+ ListColumnVector vector,
+ int row,
+ OrcList value) {
+ TypeDescription elemType = schema.getChildren().get(0);
+ vector.offsets[row] = vector.childCount;
+ vector.lengths[row] = value.size();
+ vector.childCount += vector.lengths[row];
+ vector.child.ensureSize(vector.childCount, vector.offsets[row] != 0);
+ for(int e=0; e < vector.lengths[row]; ++e) {
+ setColumn(elemType, vector.child, (int) vector.offsets[row] + e,
+ (Writable) value.get(e));
+ }
+ }
+
+ static void setMapValue(TypeDescription schema,
+ MapColumnVector vector,
+ int row,
+ OrcMap<?,?> value) {
+ TypeDescription keyType = schema.getChildren().get(0);
+ TypeDescription valueType = schema.getChildren().get(1);
+ vector.offsets[row] = vector.childCount;
+ vector.lengths[row] = value.size();
+ vector.childCount += vector.lengths[row];
+ vector.keys.ensureSize(vector.childCount, vector.offsets[row] != 0);
+ vector.values.ensureSize(vector.childCount, vector.offsets[row] != 0);
+ int e = 0;
+ for(Map.Entry<?,?> entry: value.entrySet()) {
+ setColumn(keyType, vector.keys, (int) vector.offsets[row] + e,
+ (Writable) entry.getKey());
+ setColumn(valueType, vector.values, (int) vector.offsets[row] + e,
+ (Writable) entry.getValue());
+ e += 1;
+ }
+ }
+
+ public static void setColumn(TypeDescription schema,
+ ColumnVector vector,
+ int row,
+ Writable value) {
+ if (value == null) {
+ vector.noNulls = false;
+ vector.isNull[row] = true;
+ } else {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ setLongValue(vector, row, ((BooleanWritable) value).get() ? 1 : 0);
+ break;
+ case BYTE:
+ setLongValue(vector, row, ((ByteWritable) value).get());
+ break;
+ case SHORT:
+ setLongValue(vector, row, ((ShortWritable) value).get());
+ break;
+ case INT:
+ setLongValue(vector, row, ((IntWritable) value).get());
+ break;
+ case LONG:
+ setLongValue(vector, row, ((LongWritable) value).get());
+ break;
+ case FLOAT:
+ setDoubleValue(vector, row, ((FloatWritable) value).get());
+ break;
+ case DOUBLE:
+ setDoubleValue(vector, row, ((DoubleWritable) value).get());
+ break;
+ case STRING:
+ setBinaryValue(vector, row, (Text) value);
+ break;
+ case CHAR:
+ setCharValue((BytesColumnVector) vector, row, (Text) value,
+ schema.getMaxLength());
+ break;
+ case VARCHAR:
+ setBinaryValue(vector, row, (Text) value, schema.getMaxLength());
+ break;
+ case BINARY:
+ setBinaryValue(vector, row, (BytesWritable) value);
+ break;
+ case DATE:
+ setLongValue(vector, row, ((DateWritable) value).getDays());
+ break;
+ case TIMESTAMP:
+ ((TimestampColumnVector) vector).set(row, (OrcTimestamp) value);
+ break;
+ case DECIMAL:
+ ((DecimalColumnVector) vector).set(row, (HiveDecimalWritable) value);
+ break;
+ case STRUCT:
+ setStructValue(schema, (StructColumnVector) vector, row,
+ (OrcStruct) value);
+ break;
+ case UNION:
+ setUnionValue(schema, (UnionColumnVector) vector, row,
+ (OrcUnion) value);
+ break;
+ case LIST:
+ setListValue(schema, (ListColumnVector) vector, row, (OrcList) value);
+ break;
+ case MAP:
+ setMapValue(schema, (MapColumnVector) vector, row, (OrcMap) value);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown type " + schema);
+ }
+ }
+ }
+
+ @Override
+ public void write(NullWritable nullWritable, V v) throws IOException {
+ // if the batch is full, write it out.
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+
+ // add the new row
+ int row = batch.size++;
+ // skip over the OrcKey or OrcValue
+ if (v instanceof OrcKey) {
+ v = (V)((OrcKey) v).key;
+ } else if (v instanceof OrcValue) {
+ v = (V)((OrcValue) v).value;
+ }
+ if (isTopStruct) {
+ for(int f=0; f < schema.getChildren().size(); ++f) {
+ setColumn(schema.getChildren().get(f), batch.cols[f], row,
+ ((OrcStruct) v).getFieldValue(f));
+ }
+ } else {
+ setColumn(schema, batch.cols[0], row, v);
+ }
+ }
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ writer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
index 6186c83..341fbcd 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
@@ -18,6 +18,7 @@
package org.apache.orc.mapred;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
@@ -34,9 +35,35 @@ import org.apache.orc.Writer;
import java.io.IOException;
+/**
+ * An ORC output format that satisfies the org.apache.hadoop.mapred API.
+ */
public class OrcOutputFormat<V extends Writable>
extends FileOutputFormat<NullWritable, V> {
+ /**
+ * This function builds the options for the ORC Writer based on the JobConf.
+ * @param conf the job configuration
+ * @return a new options object
+ */
+ public static OrcFile.WriterOptions buildOptions(Configuration conf) {
+ return OrcFile.writerOptions(conf)
+ .version(OrcFile.Version.byName(OrcConf.WRITE_FORMAT.getString(conf)))
+ .setSchema(TypeDescription.fromString(OrcConf.MAPRED_OUTPUT_SCHEMA
+ .getString(conf)))
+ .compress(CompressionKind.valueOf(OrcConf.COMPRESS.getString(conf)))
+ .encodingStrategy(OrcFile.EncodingStrategy.valueOf
+ (OrcConf.ENCODING_STRATEGY.getString(conf)))
+ .bloomFilterColumns(OrcConf.BLOOM_FILTER_COLUMNS.getString(conf))
+ .bloomFilterFpp(OrcConf.BLOOM_FILTER_FPP.getDouble(conf))
+ .blockSize(OrcConf.BLOCK_SIZE.getLong(conf))
+ .blockPadding(OrcConf.BLOCK_PADDING.getBoolean(conf))
+ .stripeSize(OrcConf.STRIPE_SIZE.getLong(conf))
+ .rowIndexStride((int) OrcConf.ROW_INDEX_STRIDE.getLong(conf))
+ .bufferSize((int) OrcConf.BUFFER_SIZE.getLong(conf))
+ .paddingTolerance(OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf));
+ }
+
@Override
public RecordWriter<NullWritable, V> getRecordWriter(FileSystem fileSystem,
JobConf conf,
@@ -45,21 +72,7 @@ public class OrcOutputFormat<V extends Writable>
) throws IOException {
Path path = getTaskOutputPath(conf, name);
Writer writer = OrcFile.createWriter(path,
- OrcFile.writerOptions(conf)
- .fileSystem(fileSystem)
- .version(OrcFile.Version.byName(OrcConf.WRITE_FORMAT.getString(conf)))
- .setSchema(TypeDescription.fromString(OrcConf.SCHEMA.getString(conf)))
- .compress(CompressionKind.valueOf(OrcConf.COMPRESS.getString(conf)))
- .encodingStrategy(OrcFile.EncodingStrategy.valueOf
- (OrcConf.ENCODING_STRATEGY.getString(conf)))
- .bloomFilterColumns(OrcConf.BLOOM_FILTER_COLUMNS.getString(conf))
- .bloomFilterFpp(OrcConf.BLOOM_FILTER_FPP.getDouble(conf))
- .blockSize(OrcConf.BLOCK_SIZE.getLong(conf))
- .blockPadding(OrcConf.BLOCK_PADDING.getBoolean(conf))
- .stripeSize(OrcConf.STRIPE_SIZE.getLong(conf))
- .rowIndexStride((int) OrcConf.ROW_INDEX_STRIDE.getLong(conf))
- .bufferSize((int) OrcConf.BUFFER_SIZE.getLong(conf))
- .paddingTolerance(OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf)));
- return new OrcRecordWriter<>(writer);
+ buildOptions(conf).fileSystem(fileSystem));
+ return new OrcMapredRecordWriter<>(writer);
}
}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java
deleted file mode 100644
index f6bf635..0000000
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.mapred;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
-
-public class OrcRecordReader<V extends WritableComparable>
- implements org.apache.hadoop.mapred.RecordReader<NullWritable, V> {
- private final TypeDescription schema;
- private final RecordReader batchReader;
- private final VectorizedRowBatch batch;
- private int rowInBatch;
-
- protected OrcRecordReader(Reader fileReader,
- Reader.Options options) throws IOException {
- this.batchReader = fileReader.rows(options);
- if (options.getSchema() == null) {
- schema = fileReader.getSchema();
- } else {
- schema = options.getSchema();
- }
- this.batch = schema.createRowBatch();
- rowInBatch = 0;
- }
-
- /**
- * If the current batch is empty, get a new one.
- * @return true if we have rows available.
- * @throws IOException
- */
- boolean ensureBatch() throws IOException {
- if (rowInBatch >= batch.size) {
- rowInBatch = 0;
- return batchReader.nextBatch(batch);
- }
- return true;
- }
-
- @Override
- public boolean next(NullWritable key, V value) throws IOException {
- if (!ensureBatch()) {
- return false;
- }
- if (schema.getCategory() == TypeDescription.Category.STRUCT) {
- OrcStruct result = (OrcStruct) value;
- List<TypeDescription> children = schema.getChildren();
- int numberOfChildren = children.size();
- for(int i=0; i < numberOfChildren; ++i) {
- result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch,
- children.get(i), result.getFieldValue(i)));
- }
- } else {
- nextValue(batch.cols[0], rowInBatch, schema, value);
- }
- rowInBatch += 1;
- return true;
- }
-
- @Override
- public NullWritable createKey() {
- return NullWritable.get();
- }
-
- @Override
- public V createValue() {
- return (V) OrcStruct.createValue(schema);
- }
-
- @Override
- public long getPos() throws IOException {
- return 0;
- }
-
- @Override
- public void close() throws IOException {
- batchReader.close();
- }
-
- @Override
- public float getProgress() throws IOException {
- return 0;
- }
-
- static BooleanWritable nextBoolean(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- BooleanWritable result;
- if (previous == null || previous.getClass() != BooleanWritable.class) {
- result = new BooleanWritable();
- } else {
- result = (BooleanWritable) previous;
- }
- result.set(((LongColumnVector) vector).vector[row] != 0);
- return result;
- } else {
- return null;
- }
- }
-
- static ByteWritable nextByte(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- ByteWritable result;
- if (previous == null || previous.getClass() != ByteWritable.class) {
- result = new ByteWritable();
- } else {
- result = (ByteWritable) previous;
- }
- result.set((byte) ((LongColumnVector) vector).vector[row]);
- return result;
- } else {
- return null;
- }
- }
-
- static ShortWritable nextShort(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- ShortWritable result;
- if (previous == null || previous.getClass() != ShortWritable.class) {
- result = new ShortWritable();
- } else {
- result = (ShortWritable) previous;
- }
- result.set((short) ((LongColumnVector) vector).vector[row]);
- return result;
- } else {
- return null;
- }
- }
-
- static IntWritable nextInt(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- IntWritable result;
- if (previous == null || previous.getClass() != IntWritable.class) {
- result = new IntWritable();
- } else {
- result = (IntWritable) previous;
- }
- result.set((int) ((LongColumnVector) vector).vector[row]);
- return result;
- } else {
- return null;
- }
- }
-
- static LongWritable nextLong(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- LongWritable result;
- if (previous == null || previous.getClass() != LongWritable.class) {
- result = new LongWritable();
- } else {
- result = (LongWritable) previous;
- }
- result.set(((LongColumnVector) vector).vector[row]);
- return result;
- } else {
- return null;
- }
- }
-
- static FloatWritable nextFloat(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- FloatWritable result;
- if (previous == null || previous.getClass() != FloatWritable.class) {
- result = new FloatWritable();
- } else {
- result = (FloatWritable) previous;
- }
- result.set((float) ((DoubleColumnVector) vector).vector[row]);
- return result;
- } else {
- return null;
- }
- }
-
- static DoubleWritable nextDouble(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- DoubleWritable result;
- if (previous == null || previous.getClass() != DoubleWritable.class) {
- result = new DoubleWritable();
- } else {
- result = (DoubleWritable) previous;
- }
- result.set(((DoubleColumnVector) vector).vector[row]);
- return result;
- } else {
- return null;
- }
- }
-
- static Text nextString(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- Text result;
- if (previous == null || previous.getClass() != Text.class) {
- result = new Text();
- } else {
- result = (Text) previous;
- }
- BytesColumnVector bytes = (BytesColumnVector) vector;
- result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
- return result;
- } else {
- return null;
- }
- }
-
- static BytesWritable nextBinary(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- BytesWritable result;
- if (previous == null || previous.getClass() != BytesWritable.class) {
- result = new BytesWritable();
- } else {
- result = (BytesWritable) previous;
- }
- BytesColumnVector bytes = (BytesColumnVector) vector;
- result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
- return result;
- } else {
- return null;
- }
- }
-
- static HiveDecimalWritable nextDecimal(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- HiveDecimalWritable result;
- if (previous == null || previous.getClass() != HiveDecimalWritable.class) {
- result = new HiveDecimalWritable();
- } else {
- result = (HiveDecimalWritable) previous;
- }
- result.set(((DecimalColumnVector) vector).vector[row]);
- return result;
- } else {
- return null;
- }
- }
-
- static DateWritable nextDate(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- DateWritable result;
- if (previous == null || previous.getClass() != DateWritable.class) {
- result = new DateWritable();
- } else {
- result = (DateWritable) previous;
- }
- int date = (int) ((LongColumnVector) vector).vector[row];
- result.set(date);
- return result;
- } else {
- return null;
- }
- }
-
- static OrcTimestamp nextTimestamp(ColumnVector vector,
- int row,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- OrcTimestamp result;
- if (previous == null || previous.getClass() != OrcTimestamp.class) {
- result = new OrcTimestamp();
- } else {
- result = (OrcTimestamp) previous;
- }
- TimestampColumnVector tcv = (TimestampColumnVector) vector;
- result.setTime(tcv.time[row]);
- result.setNanos(tcv.nanos[row]);
- return result;
- } else {
- return null;
- }
- }
-
- static OrcStruct nextStruct(ColumnVector vector,
- int row,
- TypeDescription schema,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- OrcStruct result;
- List<TypeDescription> childrenTypes = schema.getChildren();
- int numChildren = childrenTypes.size();
- if (previous == null || previous.getClass() != OrcStruct.class) {
- result = new OrcStruct(schema);
- } else {
- result = (OrcStruct) previous;
- }
- StructColumnVector struct = (StructColumnVector) vector;
- for(int f=0; f < numChildren; ++f) {
- result.setFieldValue(f, nextValue(struct.fields[f], row,
- childrenTypes.get(f), result.getFieldValue(f)));
- }
- return result;
- } else {
- return null;
- }
- }
-
- static OrcUnion nextUnion(ColumnVector vector,
- int row,
- TypeDescription schema,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- OrcUnion result;
- List<TypeDescription> childrenTypes = schema.getChildren();
- if (previous == null || previous.getClass() != OrcUnion.class) {
- result = new OrcUnion(schema);
- } else {
- result = (OrcUnion) previous;
- }
- UnionColumnVector union = (UnionColumnVector) vector;
- byte tag = (byte) union.tags[row];
- result.set(tag, nextValue(union.fields[tag], row, childrenTypes.get(tag),
- result.getObject()));
- return result;
- } else {
- return null;
- }
- }
-
- static OrcList nextList(ColumnVector vector,
- int row,
- TypeDescription schema,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- OrcList result;
- List<TypeDescription> childrenTypes = schema.getChildren();
- TypeDescription valueType = childrenTypes.get(0);
- if (previous == null ||
- previous.getClass() != ArrayList.class) {
- result = new OrcList(schema);
- } else {
- result = (OrcList) previous;
- }
- ListColumnVector list = (ListColumnVector) vector;
- int length = (int) list.lengths[row];
- int offset = (int) list.offsets[row];
- result.ensureCapacity(length);
- int oldLength = result.size();
- int idx = 0;
- while (idx < length && idx < oldLength) {
- result.set(idx, nextValue(list.child, offset + idx, valueType,
- result.get(idx)));
- idx += 1;
- }
- if (length < oldLength) {
- for(int i= oldLength - 1; i >= length; --i) {
- result.remove(i);
- }
- } else if (oldLength < length) {
- while (idx < length) {
- result.add(nextValue(list.child, offset + idx, valueType, null));
- idx += 1;
- }
- }
- return result;
- } else {
- return null;
- }
- }
-
- static OrcMap nextMap(ColumnVector vector,
- int row,
- TypeDescription schema,
- Object previous) {
- if (vector.isRepeating) {
- row = 0;
- }
- if (vector.noNulls || !vector.isNull[row]) {
- MapColumnVector map = (MapColumnVector) vector;
- int length = (int) map.lengths[row];
- int offset = (int) map.offsets[row];
- OrcMap result;
- List<TypeDescription> childrenTypes = schema.getChildren();
- TypeDescription keyType = childrenTypes.get(0);
- TypeDescription valueType = childrenTypes.get(1);
- if (previous == null ||
- previous.getClass() != OrcMap.class) {
- result = new OrcMap(schema);
- } else {
- result = (OrcMap) previous;
- // I couldn't think of a good way to reuse the keys and value objects
- // without even more allocations, so take the easy and safe approach.
- result.clear();
- }
- for(int e=0; e < length; ++e) {
- result.put(nextValue(map.keys, e + offset, keyType, null),
- nextValue(map.values, e + offset, valueType, null));
- }
- return result;
- } else {
- return null;
- }
- }
-
- static WritableComparable nextValue(ColumnVector vector,
- int row,
- TypeDescription schema,
- Object previous) {
- switch (schema.getCategory()) {
- case BOOLEAN:
- return nextBoolean(vector, row, previous);
- case BYTE:
- return nextByte(vector, row, previous);
- case SHORT:
- return nextShort(vector, row, previous);
- case INT:
- return nextInt(vector, row, previous);
- case LONG:
- return nextLong(vector, row, previous);
- case FLOAT:
- return nextFloat(vector, row, previous);
- case DOUBLE:
- return nextDouble(vector, row, previous);
- case STRING:
- case CHAR:
- case VARCHAR:
- return nextString(vector, row, previous);
- case BINARY:
- return nextBinary(vector, row, previous);
- case DECIMAL:
- return nextDecimal(vector, row, previous);
- case DATE:
- return nextDate(vector, row, previous);
- case TIMESTAMP:
- return nextTimestamp(vector, row, previous);
- case STRUCT:
- return nextStruct(vector, row, schema, previous);
- case UNION:
- return nextUnion(vector, row, schema, previous);
- case LIST:
- return nextList(vector, row, schema, previous);
- case MAP:
- return nextMap(vector, row, schema, previous);
- default:
- throw new IllegalArgumentException("Unknown type " + schema);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java
deleted file mode 100644
index 4237656..0000000
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.mapred;
-
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.Writer;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class OrcRecordWriter<V extends Writable>
- implements RecordWriter<NullWritable, V> {
- private final Writer writer;
- private final VectorizedRowBatch batch;
- private final TypeDescription schema;
- private final boolean isTopStruct;
-
- public OrcRecordWriter(Writer writer) {
- this.writer = writer;
- schema = writer.getSchema();
- this.batch = schema.createRowBatch();
- isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT;
- }
-
- static void setLongValue(ColumnVector vector, int row, long value) {
- ((LongColumnVector) vector).vector[row] = value;
- }
-
- static void setDoubleValue(ColumnVector vector, int row, double value) {
- ((DoubleColumnVector) vector).vector[row] = value;
- }
-
- static void setBinaryValue(ColumnVector vector, int row,
- BinaryComparable value) {
- ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0,
- value.getLength());
- }
-
- static void setBinaryValue(ColumnVector vector, int row,
- BinaryComparable value, int maxLength) {
- ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0,
- Math.min(maxLength, value.getLength()));
- }
-
- private static final ThreadLocal<byte[]> SPACE_BUFFER =
- new ThreadLocal<byte[]>() {
- @Override
- protected byte[] initialValue() {
- byte[] result = new byte[100];
- Arrays.fill(result, (byte) ' ');
- return result;
- }
- };
-
- static void setCharValue(BytesColumnVector vector,
- int row,
- Text value,
- int length) {
- // we need to trim or pad the string with spaces to required length
- int actualLength = value.getLength();
- if (actualLength >= length) {
- setBinaryValue(vector, row, value, length);
- } else {
- byte[] spaces = SPACE_BUFFER.get();
- if (length - actualLength > spaces.length) {
- spaces = new byte[length - actualLength];
- Arrays.fill(spaces, (byte)' ');
- SPACE_BUFFER.set(spaces);
- }
- vector.setConcat(row, value.getBytes(), 0, actualLength, spaces, 0,
- length - actualLength);
- }
- }
-
- static void setStructValue(TypeDescription schema,
- StructColumnVector vector,
- int row,
- OrcStruct value) {
- List<TypeDescription> children = schema.getChildren();
- for(int c=0; c < value.getNumFields(); ++c) {
- setColumn(children.get(c), vector.fields[c], row, value.getFieldValue(c));
- }
- }
-
- static void setUnionValue(TypeDescription schema,
- UnionColumnVector vector,
- int row,
- OrcUnion value) {
- List<TypeDescription> children = schema.getChildren();
- int tag = value.getTag() & 0xff;
- vector.tags[row] = tag;
- setColumn(children.get(tag), vector.fields[tag], row, value.getObject());
- }
-
-
- static void setListValue(TypeDescription schema,
- ListColumnVector vector,
- int row,
- OrcList value) {
- TypeDescription elemType = schema.getChildren().get(0);
- vector.offsets[row] = vector.childCount;
- vector.lengths[row] = value.size();
- vector.childCount += vector.lengths[row];
- vector.child.ensureSize(vector.childCount, vector.offsets[row] != 0);
- for(int e=0; e < vector.lengths[row]; ++e) {
- setColumn(elemType, vector.child, (int) vector.offsets[row] + e,
- (Writable) value.get(e));
- }
- }
-
- static void setMapValue(TypeDescription schema,
- MapColumnVector vector,
- int row,
- OrcMap<?,?> value) {
- TypeDescription keyType = schema.getChildren().get(0);
- TypeDescription valueType = schema.getChildren().get(1);
- vector.offsets[row] = vector.childCount;
- vector.lengths[row] = value.size();
- vector.childCount += vector.lengths[row];
- vector.keys.ensureSize(vector.childCount, vector.offsets[row] != 0);
- vector.values.ensureSize(vector.childCount, vector.offsets[row] != 0);
- int e = 0;
- for(Map.Entry<?,?> entry: value.entrySet()) {
- setColumn(keyType, vector.keys, (int) vector.offsets[row] + e,
- (Writable) entry.getKey());
- setColumn(valueType, vector.values, (int) vector.offsets[row] + e,
- (Writable) entry.getValue());
- e += 1;
- }
- }
-
- static void setColumn(TypeDescription schema,
- ColumnVector vector,
- int row,
- Writable value) {
- if (value == null) {
- vector.noNulls = false;
- vector.isNull[row] = true;
- } else {
- switch (schema.getCategory()) {
- case BOOLEAN:
- setLongValue(vector, row, ((BooleanWritable) value).get() ? 1 : 0);
- break;
- case BYTE:
- setLongValue(vector, row, ((ByteWritable) value).get());
- break;
- case SHORT:
- setLongValue(vector, row, ((ShortWritable) value).get());
- break;
- case INT:
- setLongValue(vector, row, ((IntWritable) value).get());
- break;
- case LONG:
- setLongValue(vector, row, ((LongWritable) value).get());
- break;
- case FLOAT:
- setDoubleValue(vector, row, ((FloatWritable) value).get());
- break;
- case DOUBLE:
- setDoubleValue(vector, row, ((DoubleWritable) value).get());
- break;
- case STRING:
- setBinaryValue(vector, row, (Text) value);
- break;
- case CHAR:
- setCharValue((BytesColumnVector) vector, row, (Text) value,
- schema.getMaxLength());
- break;
- case VARCHAR:
- setBinaryValue(vector, row, (Text) value, schema.getMaxLength());
- break;
- case BINARY:
- setBinaryValue(vector, row, (BytesWritable) value);
- break;
- case DATE:
- setLongValue(vector, row, ((DateWritable) value).getDays());
- break;
- case TIMESTAMP:
- ((TimestampColumnVector) vector).set(row, (OrcTimestamp) value);
- break;
- case DECIMAL:
- ((DecimalColumnVector) vector).set(row, (HiveDecimalWritable) value);
- break;
- case STRUCT:
- setStructValue(schema, (StructColumnVector) vector, row,
- (OrcStruct) value);
- break;
- case UNION:
- setUnionValue(schema, (UnionColumnVector) vector, row,
- (OrcUnion) value);
- break;
- case LIST:
- setListValue(schema, (ListColumnVector) vector, row, (OrcList) value);
- break;
- case MAP:
- setMapValue(schema, (MapColumnVector) vector, row, (OrcMap) value);
- break;
- default:
- throw new IllegalArgumentException("Unknown type " + schema);
- }
- }
- }
-
- @Override
- public void write(NullWritable nullWritable, V v) throws IOException {
- // if the batch is full, write it out.
- if (batch.size == batch.getMaxSize()) {
- writer.addRowBatch(batch);
- batch.reset();
- }
-
- // add the new row
- int row = batch.size++;
- if (isTopStruct) {
- for(int f=0; f < schema.getChildren().size(); ++f) {
- setColumn(schema.getChildren().get(f), batch.cols[f], row,
- ((OrcStruct) v).getFieldValue(f));
- }
- } else {
- setColumn(schema, batch.cols[0], row, v);
- }
- }
-
- @Override
- public void close(Reporter reporter) throws IOException {
- if (batch.size != 0) {
- writer.addRowBatch(batch);
- batch.reset();
- }
- writer.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
index 2dd749b..51e0d60 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
@@ -82,6 +82,14 @@ public final class OrcStruct implements WritableComparable<OrcStruct> {
}
/**
+ * Get the schema for this object.
+ * @return the schema object
+ */
+ public TypeDescription getSchema() {
+ return schema;
+ }
+
+ /**
* Set all of the fields in the struct
* @param values the list of values for each of the fields.
*/
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
index ee97b9f..5564177 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
@@ -17,12 +17,12 @@
*/
package org.apache.orc.mapred;
-
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Date;
import java.sql.Timestamp;
import java.util.Date;
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java
new file mode 100644
index 0000000..dc9912d
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * This type provides a wrapper for OrcStruct so that it can be sent through
+ * the MapReduce shuffle as a value.
+ *
+ * The user should set the JobConf with orc.mapred.value.type with the type
+ * string of the type.
+ */
+public final class OrcValue implements Writable, JobConfigurable {
+
+ public WritableComparable value;
+
+ public OrcValue(WritableComparable value) {
+ this.value = value;
+ }
+
+ public OrcValue() {
+ value = null;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ value.write(dataOutput);
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ value.readFields(dataInput);
+ }
+
+ @Override
+ public void configure(JobConf conf) {
+ if (value == null) {
+ TypeDescription schema =
+ TypeDescription.fromString(OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA
+ .getString(conf));
+ value = OrcStruct.createValue(schema);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java
new file mode 100644
index 0000000..9427e78
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+
+/**
+ * An ORC input format that satisfies the org.apache.hadoop.mapreduce API.
+ */
+public class OrcInputFormat<V extends WritableComparable>
+ extends FileInputFormat<NullWritable, V> {
+
+ /**
+ * Put the given SearchArgument into the configuration for an OrcInputFormat.
+ * @param conf the configuration to modify
+ * @param sarg the SearchArgument to put in the configuration
+ * @param columnNames the list of column names for the SearchArgument
+ */
+ public static void setSearchArgument(Configuration conf,
+ SearchArgument sarg,
+ String[] columnNames) {
+ org.apache.orc.mapred.OrcInputFormat.setSearchArgument(conf, sarg,
+ columnNames);
+ }
+
+ @Override
+ public RecordReader<NullWritable, V>
+ createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext
+ ) throws IOException, InterruptedException {
+ FileSplit split = (FileSplit) inputSplit;
+ Configuration conf = taskAttemptContext.getConfiguration();
+ Reader file = OrcFile.createReader(split.getPath(),
+ OrcFile.readerOptions(conf)
+ .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
+ return new OrcMapreduceRecordReader<>(file,
+ org.apache.orc.mapred.OrcInputFormat.buildOptions(conf,
+ file, split.getStart(), split.getLength()));
+ }
+}