You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/03/26 19:14:39 UTC
svn commit: r1581977 [5/5] - in /hive/trunk: ./
common/src/java/org/apache/hadoop/hive/common/
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/
metastore/src/java/org/apache/hadoop/hive/metastore/
metastore/src/java/org/apache/hadoop/hi...
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java?rev=1581977&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java Wed Mar 26 18:14:37 2014
@@ -0,0 +1,1111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair;
+import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey;
+import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.mockito.MockSettings;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestOrcRawRecordMerger {
+
+ private static final Log LOG = LogFactory.getLog(TestOrcRawRecordMerger.class);
+
+ @Test
+ public void testOrdering() throws Exception {
+ ReaderKey left = new ReaderKey(100, 200, 1200, 300);
+ ReaderKey right = new ReaderKey();
+ right.setValues(100, 200, 1000, 200);
+ assertTrue(right.compareTo(left) < 0);
+ assertTrue(left.compareTo(right) > 0);
+ assertEquals(false, left.equals(right));
+ left.set(right);
+ assertTrue(right.compareTo(left) == 0);
+ assertEquals(true, right.equals(left));
+ right.setRowId(2000);
+ assertTrue(right.compareTo(left) > 0);
+ left.setValues(1, 2, 3, 4);
+ right.setValues(100, 2, 3, 4);
+ assertTrue(left.compareTo(right) < 0);
+ assertTrue(right.compareTo(left) > 0);
+ left.setValues(1, 2, 3, 4);
+ right.setValues(1, 100, 3, 4);
+ assertTrue(left.compareTo(right) < 0);
+ assertTrue(right.compareTo(left) > 0);
+ left.setValues(1, 2, 3, 100);
+ right.setValues(1, 2, 3, 4);
+ assertTrue(left.compareTo(right) < 0);
+ assertTrue(right.compareTo(left) > 0);
+
+ // ensure that we are consistent when comparing to the base class
+ RecordIdentifier ri = new RecordIdentifier(1, 2, 3);
+ assertEquals(1, ri.compareTo(left));
+ assertEquals(-1, left.compareTo(ri));
+ assertEquals(false, ri.equals(left));
+ assertEquals(false, left.equals(ri));
+ }
+
+ private static void setRow(OrcStruct event,
+ int operation,
+ long originalTransaction,
+ int bucket,
+ long rowId,
+ long currentTransaction,
+ String value) {
+ event.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation));
+ event.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
+ new LongWritable(originalTransaction));
+ event.setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucket));
+ event.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(rowId));
+ event.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
+ new LongWritable(currentTransaction));
+ OrcStruct row = new OrcStruct(1);
+ row.setFieldValue(0, new Text(value));
+ event.setFieldValue(OrcRecordUpdater.ROW, row);
+ }
+
+ private static String value(OrcStruct event) {
+ return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
+ }
+
+ private List<StripeInformation> createStripes(long... rowCounts) {
+ long offset = 0;
+ List<StripeInformation> result =
+ new ArrayList<StripeInformation>(rowCounts.length);
+ for(long count: rowCounts) {
+ OrcProto.StripeInformation.Builder stripe =
+ OrcProto.StripeInformation.newBuilder();
+ stripe.setDataLength(800).setIndexLength(100).setFooterLength(100)
+ .setNumberOfRows(count).setOffset(offset);
+ offset += 1000;
+ result.add(new ReaderImpl.StripeInformationImpl(stripe.build()));
+ }
+ return result;
+ }
+
+ // can add .verboseLogging() to cause Mockito to log invocations
+ private final MockSettings settings = Mockito.withSettings().verboseLogging();
+
+ private Reader createMockReader() throws IOException {
+ Reader reader = Mockito.mock(Reader.class, settings);
+ RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
+ OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS);
+ setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first");
+ OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS);
+ setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second");
+ OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS);
+ setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third");
+ OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS);
+ setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth");
+ OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
+ setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
+ Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
+ .thenReturn(recordReader);
+
+ Mockito.when(recordReader.hasNext()).
+ thenReturn(true, true, true, true, true, false);
+
+ Mockito.when(recordReader.getProgress()).thenReturn(1.0f);
+
+ Mockito.when(recordReader.next(null)).thenReturn(row1);
+ Mockito.when(recordReader.next(row1)).thenReturn(row2);
+ Mockito.when(recordReader.next(row2)).thenReturn(row3);
+ Mockito.when(recordReader.next(row3)).thenReturn(row4);
+ Mockito.when(recordReader.next(row4)).thenReturn(row5);
+
+ return reader;
+ }
+
+ @Test
+ public void testReaderPair() throws Exception {
+ ReaderKey key = new ReaderKey();
+ Reader reader = createMockReader();
+ RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
+ RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
+ ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
+ new Reader.Options());
+ RecordReader recordReader = pair.recordReader;
+ assertEquals(10, key.getTransactionId());
+ assertEquals(20, key.getBucketId());
+ assertEquals(40, key.getRowId());
+ assertEquals(120, key.getCurrentTransactionId());
+ assertEquals("third", value(pair.nextRecord));
+
+ pair.next(pair.nextRecord);
+ assertEquals(40, key.getTransactionId());
+ assertEquals(50, key.getBucketId());
+ assertEquals(60, key.getRowId());
+ assertEquals(130, key.getCurrentTransactionId());
+ assertEquals("fourth", value(pair.nextRecord));
+
+ pair.next(pair.nextRecord);
+ assertEquals(null, pair.nextRecord);
+ Mockito.verify(recordReader).close();
+ }
+
+ @Test
+ public void testReaderPairNoMin() throws Exception {
+ ReaderKey key = new ReaderKey();
+ Reader reader = createMockReader();
+
+ ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
+ new Reader.Options());
+ RecordReader recordReader = pair.recordReader;
+ assertEquals(10, key.getTransactionId());
+ assertEquals(20, key.getBucketId());
+ assertEquals(20, key.getRowId());
+ assertEquals(100, key.getCurrentTransactionId());
+ assertEquals("first", value(pair.nextRecord));
+
+ pair.next(pair.nextRecord);
+ assertEquals(10, key.getTransactionId());
+ assertEquals(20, key.getBucketId());
+ assertEquals(30, key.getRowId());
+ assertEquals(110, key.getCurrentTransactionId());
+ assertEquals("second", value(pair.nextRecord));
+
+ pair.next(pair.nextRecord);
+ assertEquals(10, key.getTransactionId());
+ assertEquals(20, key.getBucketId());
+ assertEquals(40, key.getRowId());
+ assertEquals(120, key.getCurrentTransactionId());
+ assertEquals("third", value(pair.nextRecord));
+
+ pair.next(pair.nextRecord);
+ assertEquals(40, key.getTransactionId());
+ assertEquals(50, key.getBucketId());
+ assertEquals(60, key.getRowId());
+ assertEquals(130, key.getCurrentTransactionId());
+ assertEquals("fourth", value(pair.nextRecord));
+
+ pair.next(pair.nextRecord);
+ assertEquals(40, key.getTransactionId());
+ assertEquals(50, key.getBucketId());
+ assertEquals(61, key.getRowId());
+ assertEquals(140, key.getCurrentTransactionId());
+ assertEquals("fifth", value(pair.nextRecord));
+
+ pair.next(pair.nextRecord);
+ assertEquals(null, pair.nextRecord);
+ Mockito.verify(recordReader).close();
+ }
+
+ private static OrcStruct createOriginalRow(String value) {
+ OrcStruct result = new OrcStruct(1);
+ result.setFieldValue(0, new Text(value));
+ return result;
+ }
+
+ private Reader createMockOriginalReader() throws IOException {
+ Reader reader = Mockito.mock(Reader.class, settings);
+ RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
+ OrcStruct row1 = createOriginalRow("first");
+ OrcStruct row2 = createOriginalRow("second");
+ OrcStruct row3 = createOriginalRow("third");
+ OrcStruct row4 = createOriginalRow("fourth");
+ OrcStruct row5 = createOriginalRow("fifth");
+
+ Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
+ .thenReturn(recordReader);
+ Mockito.when(recordReader.hasNext()).
+ thenReturn(true, true, true, true, true, false);
+ Mockito.when(recordReader.getRowNumber()).thenReturn(0L, 1L, 2L, 3L, 4L);
+ Mockito.when(recordReader.next(null)).thenReturn(row1);
+ Mockito.when(recordReader.next(row1)).thenReturn(row2);
+ Mockito.when(recordReader.next(row2)).thenReturn(row3);
+ Mockito.when(recordReader.next(row3)).thenReturn(row4);
+ Mockito.when(recordReader.next(row4)).thenReturn(row5);
+ return reader;
+ }
+
+ @Test
+ public void testOriginalReaderPair() throws Exception {
+ ReaderKey key = new ReaderKey();
+ Reader reader = createMockOriginalReader();
+ RecordIdentifier minKey = new RecordIdentifier(0, 10, 1);
+ RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3);
+ boolean[] includes = new boolean[]{true, true};
+ ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey,
+ new Reader.Options().include(includes));
+ RecordReader recordReader = pair.recordReader;
+ assertEquals(0, key.getTransactionId());
+ assertEquals(10, key.getBucketId());
+ assertEquals(2, key.getRowId());
+ assertEquals(0, key.getCurrentTransactionId());
+ assertEquals("third", value(pair.nextRecord));
+
+ pair.next(pair.nextRecord);
+ assertEquals(0, key.getTransactionId());
+ assertEquals(10, key.getBucketId());
+ assertEquals(3, key.getRowId());
+ assertEquals(0, key.getCurrentTransactionId());
+ assertEquals("fourth", value(pair.nextRecord));
+
+ pair.next(pair.nextRecord);
+ assertEquals(null, pair.nextRecord);
+ Mockito.verify(recordReader).close();
+ }
+
+ private static ValidTxnList createMaximalTxnList() {
+ return new ValidTxnListImpl(Long.MAX_VALUE + ":");
+ }
+
+ @Test
+ public void testOriginalReaderPairNoMin() throws Exception {
+ ReaderKey key = new ReaderKey();
+ Reader reader = createMockOriginalReader();
+ ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null,
+ new Reader.Options());
+ assertEquals("first", value(pair.nextRecord));
+ assertEquals(0, key.getTransactionId());
+ assertEquals(10, key.getBucketId());
+ assertEquals(0, key.getRowId());
+ assertEquals(0, key.getCurrentTransactionId());
+
+ pair.next(pair.nextRecord);
+ assertEquals("second", value(pair.nextRecord));
+ assertEquals(0, key.getTransactionId());
+ assertEquals(10, key.getBucketId());
+ assertEquals(1, key.getRowId());
+ assertEquals(0, key.getCurrentTransactionId());
+
+ pair.next(pair.nextRecord);
+ assertEquals("third", value(pair.nextRecord));
+ assertEquals(0, key.getTransactionId());
+ assertEquals(10, key.getBucketId());
+ assertEquals(2, key.getRowId());
+ assertEquals(0, key.getCurrentTransactionId());
+
+ pair.next(pair.nextRecord);
+ assertEquals("fourth", value(pair.nextRecord));
+ assertEquals(0, key.getTransactionId());
+ assertEquals(10, key.getBucketId());
+ assertEquals(3, key.getRowId());
+ assertEquals(0, key.getCurrentTransactionId());
+
+ pair.next(pair.nextRecord);
+ assertEquals("fifth", value(pair.nextRecord));
+ assertEquals(0, key.getTransactionId());
+ assertEquals(10, key.getBucketId());
+ assertEquals(4, key.getRowId());
+ assertEquals(0, key.getCurrentTransactionId());
+
+ pair.next(pair.nextRecord);
+ assertEquals(null, pair.nextRecord);
+ Mockito.verify(pair.recordReader).close();
+ }
+
+ @Test
+ public void testNewBase() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("columns", "col1");
+ conf.set("columns.types", "string");
+ Reader reader = Mockito.mock(Reader.class, settings);
+ RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
+
+ List<OrcProto.Type> types = new ArrayList<OrcProto.Type>();
+ OrcProto.Type.Builder typeBuilder = OrcProto.Type.newBuilder();
+ typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1)
+ .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5)
+ .addSubtypes(6);
+ types.add(typeBuilder.build());
+ types.add(null);
+ types.add(null);
+ types.add(null);
+ types.add(null);
+ types.add(null);
+ typeBuilder.clearSubtypes();
+ typeBuilder.addSubtypes(7);
+ types.add(typeBuilder.build());
+
+ Mockito.when(reader.getTypes()).thenReturn(types);
+ Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
+ .thenReturn(recordReader);
+
+ OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS);
+ setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first");
+ OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS);
+ setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second");
+ OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS);
+ setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third");
+ OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS);
+ setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth");
+ OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
+ setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
+
+ Mockito.when(recordReader.hasNext()).
+ thenReturn(true, true, true, true, true, false);
+
+ Mockito.when(recordReader.getProgress()).thenReturn(1.0f);
+
+ Mockito.when(recordReader.next(null)).thenReturn(row1, row4);
+ Mockito.when(recordReader.next(row1)).thenReturn(row2);
+ Mockito.when(recordReader.next(row2)).thenReturn(row3);
+ Mockito.when(recordReader.next(row3)).thenReturn(row5);
+
+ Mockito.when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME))
+ .thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61"
+ .getBytes("UTF-8")));
+ Mockito.when(reader.getStripes())
+ .thenReturn(createStripes(2, 2, 1));
+
+ OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader,
+ false, 10, createMaximalTxnList(),
+ new Reader.Options().range(1000, 1000), null);
+ RecordReader rr = merger.getCurrentReader().recordReader;
+ assertEquals(0, merger.getOtherReaders().size());
+
+ assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey());
+ assertEquals(new RecordIdentifier(40, 50, 60), merger.getMaxKey());
+ RecordIdentifier id = merger.createKey();
+ OrcStruct event = merger.createValue();
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(10, id.getTransactionId());
+ assertEquals(20, id.getBucketId());
+ assertEquals(40, id.getRowId());
+ assertEquals("third", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(40, id.getTransactionId());
+ assertEquals(50, id.getBucketId());
+ assertEquals(60, id.getRowId());
+ assertEquals("fourth", getValue(event));
+
+ assertEquals(false, merger.next(id, event));
+ assertEquals(1.0, merger.getProgress(), 0.01);
+ merger.close();
+ Mockito.verify(rr).close();
+ Mockito.verify(rr).getProgress();
+
+ StructObjectInspector eventObjectInspector =
+ (StructObjectInspector) merger.getObjectInspector();
+ List<? extends StructField> fields =
+ eventObjectInspector.getAllStructFieldRefs();
+ assertEquals(OrcRecordUpdater.FIELDS, fields.size());
+ assertEquals("operation",
+ fields.get(OrcRecordUpdater.OPERATION).getFieldName());
+ assertEquals("currentTransaction",
+ fields.get(OrcRecordUpdater.CURRENT_TRANSACTION).getFieldName());
+ assertEquals("originalTransaction",
+ fields.get(OrcRecordUpdater.ORIGINAL_TRANSACTION).getFieldName());
+ assertEquals("bucket",
+ fields.get(OrcRecordUpdater.BUCKET).getFieldName());
+ assertEquals("rowId",
+ fields.get(OrcRecordUpdater.ROW_ID).getFieldName());
+ StructObjectInspector rowObjectInspector =
+ (StructObjectInspector) fields.get(OrcRecordUpdater.ROW)
+ .getFieldObjectInspector();
+ assertEquals("col1",
+ rowObjectInspector.getAllStructFieldRefs().get(0).getFieldName());
+ }
+
+ static class MyRow {
+ Text col1;
+ MyRow(String val) {
+ col1 = new Text(val);
+ }
+ }
+
+ static String getValue(OrcStruct event) {
+ return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
+ }
+
+ @Test
+ public void testEmpty() throws Exception {
+ final int BUCKET = 0;
+ Configuration conf = new Configuration();
+ OrcOutputFormat of = new OrcOutputFormat();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path root = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp" +
+ File.separator + "testEmpty")).makeQualified(fs);
+ fs.delete(root, true);
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ // write the empty base
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .inspector(inspector).bucket(BUCKET).writingBase(true)
+ .maximumTransactionId(100);
+ of.getRecordUpdater(root, options).close(false);
+
+ ValidTxnList txnList = new ValidTxnListImpl("200:");
+ AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
+
+ Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
+ BUCKET);
+ Reader baseReader = OrcFile.createReader(basePath,
+ OrcFile.readerOptions(conf));
+ OrcRawRecordMerger merger =
+ new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+ createMaximalTxnList(), new Reader.Options(),
+ AcidUtils.getPaths(directory.getCurrentDirectories()));
+ RecordIdentifier key = merger.createKey();
+ OrcStruct value = merger.createValue();
+ assertEquals(false, merger.next(key, value));
+ }
+
+ /**
+ * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
+ * a base and a delta.
+ * @throws Exception
+ */
+ @Test
+ public void testNewBaseAndDelta() throws Exception {
+ final int BUCKET = 10;
+ String[] values = new String[]{"first", "second", "third", "fourth",
+ "fifth", "sixth", "seventh", "eighth",
+ "ninth", "tenth"};
+ Configuration conf = new Configuration();
+ OrcOutputFormat of = new OrcOutputFormat();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path root = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp" +
+ File.separator + "testNewBaseAndDelta")).makeQualified(fs);
+ fs.delete(root, true);
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ // write the base
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .inspector(inspector).bucket(BUCKET);
+ RecordUpdater ru = of.getRecordUpdater(root,
+ options.writingBase(true).maximumTransactionId(100));
+ for(String v: values) {
+ ru.insert(0, new MyRow(v));
+ }
+ ru.close(false);
+
+ // write a delta
+ ru = of.getRecordUpdater(root, options.writingBase(false)
+ .minimumTransactionId(200).maximumTransactionId(200));
+ ru.update(200, 0, 0, new MyRow("update 1"));
+ ru.update(200, 0, 2, new MyRow("update 2"));
+ ru.update(200, 0, 3, new MyRow("update 3"));
+ ru.delete(200, 0, 7);
+ ru.delete(200, 0, 8);
+ ru.close(false);
+
+ ValidTxnList txnList = new ValidTxnListImpl("200:");
+ AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
+
+ assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
+ assertEquals(new Path(root, "delta_0000200_0000200"),
+ directory.getCurrentDirectories().get(0).getPath());
+
+ Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
+ BUCKET);
+ Reader baseReader = OrcFile.createReader(basePath,
+ OrcFile.readerOptions(conf));
+ OrcRawRecordMerger merger =
+ new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+ createMaximalTxnList(), new Reader.Options(),
+ AcidUtils.getPaths(directory.getCurrentDirectories()));
+ assertEquals(null, merger.getMinKey());
+ assertEquals(null, merger.getMaxKey());
+ RecordIdentifier id = merger.createKey();
+ OrcStruct event = merger.createValue();
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
+ assertEquals("update 1", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
+ assertEquals("second", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
+ assertEquals("update 2", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
+ assertEquals("update 3", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
+ assertEquals("fifth", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
+ assertEquals("sixth", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
+ assertEquals("seventh", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
+ assertEquals(null, OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
+ assertEquals(null, OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
+ assertEquals("tenth", getValue(event));
+
+ assertEquals(false, merger.next(id, event));
+ merger.close();
+
+ // make a merger that doesn't collapse events
+ merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+ createMaximalTxnList(), new Reader.Options(),
+ AcidUtils.getPaths(directory.getCurrentDirectories()));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
+ assertEquals("update 1", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 0, 0), id);
+ assertEquals("first", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
+ assertEquals("second", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
+ assertEquals("update 2", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 2, 0), id);
+ assertEquals("third", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
+ assertEquals("update 3", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 3, 0), id);
+ assertEquals("fourth", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
+ assertEquals("fifth", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
+ assertEquals("sixth", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
+ assertEquals("seventh", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
+ assertEquals(null, OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 7, 0), id);
+ assertEquals("eighth", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
+ assertEquals(null, OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 8, 0), id);
+ assertEquals("ninth", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
+ assertEquals("tenth", getValue(event));
+
+ assertEquals(false, merger.next(id, event));
+ merger.close();
+
+ // try ignoring the 200 transaction and make sure it works still
+ ValidTxnList txns = new ValidTxnListImpl("2000:200");
+ merger =
+ new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+ txns, new Reader.Options(),
+ AcidUtils.getPaths(directory.getCurrentDirectories()));
+ for(int i=0; i < values.length; ++i) {
+ assertEquals(true, merger.next(id, event));
+ LOG.info("id = " + id + "event = " + event);
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET, i, 0), id);
+ assertEquals(values[i], getValue(event));
+ }
+
+ assertEquals(false, merger.next(id, event));
+ merger.close();
+ }
+
+ static class BigRow {
+ int myint;
+ long mylong;
+ Text mytext;
+ float myfloat;
+ double mydouble;
+
+ BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble) {
+ this.myint = myint;
+ this.mylong = mylong;
+ this.mytext = new Text(mytext);
+ this.myfloat = myfloat;
+ this.mydouble = mydouble;
+ }
+ }
+
+ /**
+ * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
+ * a base and a delta.
+ * @throws Exception
+ */
+ @Test
+ public void testRecordReaderOldBaseAndDelta() throws Exception {
+ final int BUCKET = 10;
+ Configuration conf = new Configuration();
+ OrcOutputFormat of = new OrcOutputFormat();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path root = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp" +
+ File.separator + "testOldBaseAndDelta")).makeQualified(fs);
+ fs.delete(root, true);
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ // write the base
+ MemoryManager mgr = new MemoryManager(conf){
+ int rowsAddedSinceCheck = 0;
+
+ synchronized void addedRow() throws IOException {
+ if (++rowsAddedSinceCheck >= 2) {
+ notifyWriters();
+ rowsAddedSinceCheck = 0;
+ }
+ }
+ };
+ // make 5 stripes with 2 rows each
+ Writer writer = OrcFile.createWriter(new Path(root, "0000010_0"),
+ OrcFile.writerOptions(conf).inspector(inspector).fileSystem(fs)
+ .blockPadding(false).bufferSize(10000).compress(CompressionKind.NONE)
+ .stripeSize(1).memory(mgr).version(OrcFile.Version.V_0_11));
+ String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
+ "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
+ for(int i=0; i < values.length; ++i) {
+ writer.addRow(new BigRow(i, i, values[i], i, i));
+ }
+ writer.close();
+
+ // write a delta
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
+ .bucket(BUCKET).inspector(inspector).filesystem(fs);
+ RecordUpdater ru = of.getRecordUpdater(root, options);
+ values = new String[]{"0.0", null, null, "1.1", null, null, null,
+ "ignore.7"};
+ for(int i=0; i < values.length; ++i) {
+ if (values[i] != null) {
+ ru.update(1, 0, i, new BigRow(i, i, values[i], i, i));
+ }
+ }
+ ru.delete(100, 0, 9);
+ ru.close(false);
+
+ // write a delta
+ options = options.minimumTransactionId(2).maximumTransactionId(2);
+ ru = of.getRecordUpdater(root, options);
+ values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
+ for(int i=0; i < values.length; ++i) {
+ if (values[i] != null) {
+ ru.update(2, 0, i, new BigRow(i, i, values[i], i, i));
+ }
+ }
+ ru.delete(100, 0, 8);
+ ru.close(false);
+
+ InputFormat inf = new OrcInputFormat();
+ JobConf job = new JobConf();
+ job.set("mapred.min.split.size", "1");
+ job.set("mapred.max.split.size", "2");
+ job.set("mapred.input.dir", root.toString());
+ InputSplit[] splits = inf.getSplits(job, 5);
+ assertEquals(5, splits.length);
+ org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
+
+ // loop through the 5 splits and read each
+ for(int i=0; i < 4; ++i) {
+ System.out.println("starting split " + i);
+ rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+ NullWritable key = rr.createKey();
+ OrcStruct value = rr.createValue();
+
+ // there should be exactly two rows per a split
+ for(int j=0; j < 2; ++j) {
+ System.out.println("i = " + i + ", j = " + j);
+ assertEquals(true, rr.next(key, value));
+ System.out.println("record = " + value);
+ assertEquals(i + "." + j, value.getFieldValue(2).toString());
+ }
+ assertEquals(false, rr.next(key, value));
+ }
+ rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
+ assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+ }
+
+ /**
+ * Test the RecordReader when there is a new base and a delta.
+ * @throws Exception
+ */
+ @Test
+ public void testRecordReaderNewBaseAndDelta() throws Exception {
+ final int BUCKET = 10;
+ Configuration conf = new Configuration();
+ OrcOutputFormat of = new OrcOutputFormat();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path root = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp" +
+ File.separator + "testRecordReaderNewBaseAndDelta"))
+ .makeQualified(fs);
+ fs.delete(root, true);
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ // write the base
+ MemoryManager mgr = new MemoryManager(conf){
+ int rowsAddedSinceCheck = 0;
+
+ synchronized void addedRow() throws IOException {
+ if (++rowsAddedSinceCheck >= 2) {
+ notifyWriters();
+ rowsAddedSinceCheck = 0;
+ }
+ }
+ };
+
+ // make 5 stripes with 2 rows each
+ OrcRecordUpdater.OrcOptions options = (OrcRecordUpdater.OrcOptions)
+ new OrcRecordUpdater.OrcOptions(conf)
+ .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
+ .bucket(BUCKET).inspector(inspector).filesystem(fs);
+ options.orcOptions(OrcFile.writerOptions(conf)
+ .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
+ .memory(mgr));
+ RecordUpdater ru = of.getRecordUpdater(root, options);
+ String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
+ "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
+ for(int i=0; i < values.length; ++i) {
+ ru.insert(0, new BigRow(i, i, values[i], i, i));
+ }
+ ru.close(false);
+
+ // write a delta
+ options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1);
+ ru = of.getRecordUpdater(root, options);
+ values = new String[]{"0.0", null, null, "1.1", null, null, null,
+ "ignore.7"};
+ for(int i=0; i < values.length; ++i) {
+ if (values[i] != null) {
+ ru.update(1, 0, i, new BigRow(i, i, values[i], i, i));
+ }
+ }
+ ru.delete(100, 0, 9);
+ ru.close(false);
+
+ // write a delta
+ options.minimumTransactionId(2).maximumTransactionId(2);
+ ru = of.getRecordUpdater(root, options);
+ values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
+ for(int i=0; i < values.length; ++i) {
+ if (values[i] != null) {
+ ru.update(2, 0, i, new BigRow(i, i, values[i], i, i));
+ }
+ }
+ ru.delete(100, 0, 8);
+ ru.close(false);
+
+ InputFormat inf = new OrcInputFormat();
+ JobConf job = new JobConf();
+ job.set("mapred.min.split.size", "1");
+ job.set("mapred.max.split.size", "2");
+ job.set("mapred.input.dir", root.toString());
+ InputSplit[] splits = inf.getSplits(job, 5);
+ assertEquals(5, splits.length);
+ org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
+
+ // loop through the 5 splits and read each
+ for(int i=0; i < 4; ++i) {
+ System.out.println("starting split " + i);
+ rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+ NullWritable key = rr.createKey();
+ OrcStruct value = rr.createValue();
+
+ // there should be exactly two rows per a split
+ for(int j=0; j < 2; ++j) {
+ System.out.println("i = " + i + ", j = " + j);
+ assertEquals(true, rr.next(key, value));
+ System.out.println("record = " + value);
+ assertEquals(i + "." + j, value.getFieldValue(2).toString());
+ }
+ assertEquals(false, rr.next(key, value));
+ }
+ rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
+ assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+ }
+
+ /**
+ * Test the RecordReader when there is a new base and a delta.
+ * @throws Exception
+ */
+ @Test
+ public void testRecordReaderDelta() throws Exception {
+ final int BUCKET = 0;
+ Configuration conf = new Configuration();
+ OrcOutputFormat of = new OrcOutputFormat();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path root = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp" +
+ File.separator + "testRecordReaderDelta"))
+ .makeQualified(fs);
+ fs.delete(root, true);
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ // write a delta
+ AcidOutputFormat.Options options =
+ new AcidOutputFormat.Options(conf)
+ .bucket(BUCKET).inspector(inspector).filesystem(fs)
+ .writingBase(false).minimumTransactionId(1).maximumTransactionId(1);
+ RecordUpdater ru = of.getRecordUpdater(root, options);
+ String[] values = new String[]{"a", "b", "c", "d", "e"};
+ for(int i=0; i < values.length; ++i) {
+ ru.insert(1, new MyRow(values[i]));
+ }
+ ru.close(false);
+
+ // write a delta
+ options.minimumTransactionId(2).maximumTransactionId(2);
+ ru = of.getRecordUpdater(root, options);
+ values = new String[]{"f", "g", "h", "i", "j"};
+ for(int i=0; i < values.length; ++i) {
+ ru.insert(2, new MyRow(values[i]));
+ }
+ ru.close(false);
+
+ InputFormat inf = new OrcInputFormat();
+ JobConf job = new JobConf();
+ job.set("mapred.min.split.size", "1");
+ job.set("mapred.max.split.size", "2");
+ job.set("mapred.input.dir", root.toString());
+ job.set("bucket_count", "1");
+ InputSplit[] splits = inf.getSplits(job, 5);
+ assertEquals(1, splits.length);
+ org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
+ rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
+ values = new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"};
+ OrcStruct row = rr.createValue();
+ for(int i = 0; i < values.length; ++i) {
+ System.out.println("Checking " + i);
+ assertEquals(true, rr.next(NullWritable.get(), row));
+ assertEquals(values[i], row.getFieldValue(0).toString());
+ }
+ assertEquals(false, rr.next(NullWritable.get(), row));
+ }
+
+ /**
+ * Test the RecordReader when the delta has been flushed, but not closed.
+ * @throws Exception
+ */
+ @Test
+ public void testRecordReaderIncompleteDelta() throws Exception {
+ final int BUCKET = 1;
+ Configuration conf = new Configuration();
+ OrcOutputFormat of = new OrcOutputFormat();
+ FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ Path root = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp" +
+ File.separator + "testRecordReaderIncompleteDelta"))
+ .makeQualified(fs);
+ fs.delete(root, true);
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ // write a base
+ AcidOutputFormat.Options options =
+ new AcidOutputFormat.Options(conf)
+ .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
+ .bucket(BUCKET).inspector(inspector).filesystem(fs);
+ RecordUpdater ru = of.getRecordUpdater(root, options);
+ String[] values= new String[]{"1", "2", "3", "4", "5"};
+ for(int i=0; i < values.length; ++i) {
+ ru.insert(0, new MyRow(values[i]));
+ }
+ ru.close(false);
+
+ // write a delta
+ options.writingBase(false).minimumTransactionId(10)
+ .maximumTransactionId(19);
+ ru = of.getRecordUpdater(root, options);
+ values = new String[]{"6", "7", "8"};
+ for(int i=0; i < values.length; ++i) {
+ ru.insert(1, new MyRow(values[i]));
+ }
+ InputFormat inf = new OrcInputFormat();
+ JobConf job = new JobConf();
+ job.set("mapred.input.dir", root.toString());
+ job.set("bucket_count", "2");
+
+ // read the keys before the delta is flushed
+ InputSplit[] splits = inf.getSplits(job, 1);
+ assertEquals(2, splits.length);
+ org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
+ inf.getRecordReader(splits[0], job, Reporter.NULL);
+ NullWritable key = rr.createKey();
+ OrcStruct value = rr.createValue();
+ System.out.println("Looking at split " + splits[0]);
+ for(int i=1; i < 6; ++i) {
+ System.out.println("Checking row " + i);
+ assertEquals(true, rr.next(key, value));
+ assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
+ }
+ assertEquals(false, rr.next(key, value));
+
+ ru.flush();
+ ru.flush();
+ values = new String[]{"9", "10"};
+ for(int i=0; i < values.length; ++i) {
+ ru.insert(3, new MyRow(values[i]));
+ }
+ ru.flush();
+
+ splits = inf.getSplits(job, 1);
+ assertEquals(2, splits.length);
+ rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
+ Path sideFile = new Path(root +
+ "/delta_0000010_0000019/bucket_00001_flush_length");
+ assertEquals(true, fs.exists(sideFile));
+ assertEquals(24, fs.getFileStatus(sideFile).getLen());
+
+ for(int i=1; i < 11; ++i) {
+ assertEquals(true, rr.next(key, value));
+ assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
+ }
+ assertEquals(false, rr.next(key, value));
+ }
+
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java?rev=1581977&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java Wed Mar 26 18:14:37 2014
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestOrcRecordUpdater {
+
+ @Test
+ public void testAccessors() throws Exception {
+ OrcStruct event = new OrcStruct(OrcRecordUpdater.FIELDS);
+ event.setFieldValue(OrcRecordUpdater.OPERATION,
+ new IntWritable(OrcRecordUpdater.INSERT_OPERATION));
+ event.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
+ new LongWritable(100));
+ event.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
+ new LongWritable(50));
+ event.setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(200));
+ event.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(300));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(50, OrcRecordUpdater.getOriginalTransaction(event));
+ assertEquals(100, OrcRecordUpdater.getCurrentTransaction(event));
+ assertEquals(200, OrcRecordUpdater.getBucket(event));
+ assertEquals(300, OrcRecordUpdater.getRowId(event));
+ }
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+
+ static class MyRow {
+ Text field;
+ MyRow(String val) {
+ field = new Text(val);
+ }
+ }
+
+ @Test
+ public void testWriter() throws Exception {
+ Path root = new Path(workDir, "testWriter");
+ Configuration conf = new Configuration();
+ // Must use raw local because the checksummer doesn't honor flushes.
+ FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .filesystem(fs)
+ .bucket(10)
+ .writingBase(false)
+ .minimumTransactionId(10)
+ .maximumTransactionId(19)
+ .inspector(inspector)
+ .reporter(Reporter.NULL);
+ RecordUpdater updater = new OrcRecordUpdater(root, options);
+ updater.insert(11, new MyRow("first"));
+ updater.insert(11, new MyRow("second"));
+ updater.insert(11, new MyRow("third"));
+ updater.flush();
+ updater.insert(12, new MyRow("fourth"));
+ updater.insert(12, new MyRow("fifth"));
+ updater.flush();
+ Path bucketPath = AcidUtils.createFilename(root, options);
+ Path sidePath = OrcRecordUpdater.getSideFile(bucketPath);
+ DataInputStream side = fs.open(sidePath);
+
+ // read the stopping point for the first flush and make sure we only see
+ // 3 rows
+ long len = side.readLong();
+ Reader reader = OrcFile.createReader(bucketPath,
+ new OrcFile.ReaderOptions(conf).filesystem(fs).maxLength(len));
+ assertEquals(3, reader.getNumberOfRows());
+
+ // read the second flush and make sure we see all 5 rows
+ len = side.readLong();
+ side.close();
+ reader = OrcFile.createReader(bucketPath,
+ new OrcFile.ReaderOptions(conf).filesystem(fs).maxLength(len));
+ assertEquals(5, reader.getNumberOfRows());
+ RecordReader rows = reader.rows();
+
+ // check the contents of the file
+ assertEquals(true, rows.hasNext());
+ OrcStruct row = (OrcStruct) rows.next(null);
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(row));
+ assertEquals(11, OrcRecordUpdater.getCurrentTransaction(row));
+ assertEquals(11, OrcRecordUpdater.getOriginalTransaction(row));
+ assertEquals(10, OrcRecordUpdater.getBucket(row));
+ assertEquals(0, OrcRecordUpdater.getRowId(row));
+ assertEquals("first",
+ OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+ assertEquals(true, rows.hasNext());
+ row = (OrcStruct) rows.next(null);
+ assertEquals(1, OrcRecordUpdater.getRowId(row));
+ assertEquals(10, OrcRecordUpdater.getBucket(row));
+ assertEquals("second",
+ OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+ assertEquals(true, rows.hasNext());
+ row = (OrcStruct) rows.next(null);
+ assertEquals(2, OrcRecordUpdater.getRowId(row));
+ assertEquals(10, OrcRecordUpdater.getBucket(row));
+ assertEquals("third",
+ OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+ assertEquals(true, rows.hasNext());
+ row = (OrcStruct) rows.next(null);
+ assertEquals(12, OrcRecordUpdater.getCurrentTransaction(row));
+ assertEquals(12, OrcRecordUpdater.getOriginalTransaction(row));
+ assertEquals(10, OrcRecordUpdater.getBucket(row));
+ assertEquals(0, OrcRecordUpdater.getRowId(row));
+ assertEquals("fourth",
+ OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+ assertEquals(true, rows.hasNext());
+ row = (OrcStruct) rows.next(null);
+ assertEquals(1, OrcRecordUpdater.getRowId(row));
+ assertEquals("fifth",
+ OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+ assertEquals(false, rows.hasNext());
+
+ // add one more record and close
+ updater.insert(20, new MyRow("sixth"));
+ updater.close(false);
+ reader = OrcFile.createReader(bucketPath,
+ new OrcFile.ReaderOptions(conf).filesystem(fs));
+ assertEquals(6, reader.getNumberOfRows());
+ assertEquals(false, fs.exists(sidePath));
+ }
+
+ @Test
+ public void testUpdates() throws Exception {
+ Path root = new Path(workDir, "testUpdates");
+ Configuration conf = new Configuration();
+ FileSystem fs = root.getFileSystem(conf);
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .filesystem(fs)
+ .bucket(20)
+ .writingBase(false)
+ .minimumTransactionId(100)
+ .maximumTransactionId(100)
+ .inspector(inspector)
+ .reporter(Reporter.NULL);
+ RecordUpdater updater = new OrcRecordUpdater(root, options);
+ updater.update(100, 10, 30, new MyRow("update"));
+ updater.delete(100, 40, 60);
+ updater.close(false);
+ Path bucketPath = AcidUtils.createFilename(root, options);
+
+ Reader reader = OrcFile.createReader(bucketPath,
+ new OrcFile.ReaderOptions(conf).filesystem(fs));
+ assertEquals(2, reader.getNumberOfRows());
+
+ RecordReader rows = reader.rows();
+
+ // check the contents of the file
+ assertEquals(true, rows.hasNext());
+ OrcStruct row = (OrcStruct) rows.next(null);
+ assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+ OrcRecordUpdater.getOperation(row));
+ assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
+ assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row));
+ assertEquals(20, OrcRecordUpdater.getBucket(row));
+ assertEquals(30, OrcRecordUpdater.getRowId(row));
+ assertEquals("update",
+ OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+ assertEquals(true, rows.hasNext());
+ row = (OrcStruct) rows.next(null);
+ assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
+ assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row));
+ assertEquals(20, OrcRecordUpdater.getBucket(row));
+ assertEquals(60, OrcRecordUpdater.getRowId(row));
+ assertEquals(null, OrcRecordUpdater.getRow(row));
+ assertEquals(false, rows.hasNext());
+ }
+}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSerDeStats.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSerDeStats.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSerDeStats.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSerDeStats.java Wed Mar 26 18:14:37 2014
@@ -25,6 +25,7 @@ import static junit.framework.Assert.ass
import java.io.File;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -96,9 +97,7 @@ public class TestOrcSerDeStats {
MiddleStruct(InnerStruct... items) {
list.clear();
- for (InnerStruct item : items) {
- list.add(item);
- }
+ list.addAll(Arrays.asList(items));
}
}
@@ -158,9 +157,7 @@ public class TestOrcSerDeStats {
private static List<InnerStruct> list(InnerStruct... items) {
List<InnerStruct> result = new ArrayList<InnerStruct>();
- for (InnerStruct s : items) {
- result.add(s);
- }
+ result.addAll(Arrays.asList(items));
return result;
}
@@ -212,7 +209,8 @@ public class TestOrcSerDeStats {
writer.close();
assertEquals(4, writer.getNumberOfRows());
assertEquals(273, writer.getRawDataSize());
- Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+ Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs));
assertEquals(4, reader.getNumberOfRows());
assertEquals(273, reader.getRawDataSize());
assertEquals(15, reader.getRawDataSizeOfColumns(Lists.newArrayList("bytes1")));
@@ -248,7 +246,7 @@ public class TestOrcSerDeStats {
getStructFieldRef("bytes1").getFieldObjectInspector();
StringObjectInspector st = (StringObjectInspector) readerInspector.
getStructFieldRef("string1").getFieldObjectInspector();
- RecordReader rows = reader.rows(null);
+ RecordReader rows = reader.rows();
Object row = rows.next(null);
assertNotNull(row);
// check the contents of the first row
@@ -310,7 +308,8 @@ public class TestOrcSerDeStats {
assertEquals(5000, writer.getNumberOfRows());
assertEquals(430000000, writer.getRawDataSize());
- Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+ Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs));
// stats from reader
assertEquals(5000, reader.getNumberOfRows());
assertEquals(430000000, reader.getRawDataSize());
@@ -341,7 +340,8 @@ public class TestOrcSerDeStats {
assertEquals(1000, writer.getNumberOfRows());
assertEquals(950000, writer.getRawDataSize());
- Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+ Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs));
// stats from reader
assertEquals(1000, reader.getNumberOfRows());
assertEquals(950000, reader.getRawDataSize());
@@ -372,7 +372,8 @@ public class TestOrcSerDeStats {
assertEquals(1000, writer.getNumberOfRows());
assertEquals(44500, writer.getRawDataSize());
- Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+ Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs));
// stats from reader
assertEquals(1000, reader.getNumberOfRows());
assertEquals(44500, reader.getRawDataSize());
@@ -413,7 +414,8 @@ public class TestOrcSerDeStats {
long rawDataSize = writer.getRawDataSize();
assertEquals(2, rowCount);
assertEquals(1740, rawDataSize);
- Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+ Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs));
assertEquals(2, reader.getNumberOfRows());
assertEquals(1740, reader.getRawDataSize());
@@ -506,7 +508,8 @@ public class TestOrcSerDeStats {
long rawDataSize = writer.getRawDataSize();
assertEquals(2, rowCount);
assertEquals(1740, rawDataSize);
- Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+ Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs));
assertEquals(2, reader.getNumberOfRows());
assertEquals(1740, reader.getRawDataSize());
@@ -573,7 +576,8 @@ public class TestOrcSerDeStats {
@Test(expected = ClassCastException.class)
public void testSerdeStatsOldFormat() throws Exception {
Path oldFilePath = new Path(HiveTestUtils.getFileFromClasspath("orc-file-11-format.orc"));
- Reader reader = OrcFile.createReader(fs, oldFilePath, conf);
+ Reader reader = OrcFile.createReader(oldFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs));
int stripeCount = 0;
int rowCount = 0;
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java Wed Mar 26 18:14:37 2014
@@ -88,6 +88,8 @@ public class TestOrcSplitElimination {
conf.set("columns", "userid,string1,subtype,decimal1,ts");
conf.set("columns.types", "bigint,string,double,decimal,timestamp");
// needed columns
+ conf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false");
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2");
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "userid,subtype");
fs = FileSystem.getLocal(conf);
testFilePath = new Path(workDir, "TestOrcFile." +
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java Wed Mar 26 18:14:37 2014
@@ -22,7 +22,6 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.Decimal128;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.DateWritable;
@@ -64,6 +63,7 @@ public class TestVectorizedORCReader {
fs.delete(testFilePath, false);
}
+ @SuppressWarnings("unused")
static class MyRecord {
private final Boolean bo;
private final Byte by;
@@ -131,10 +131,12 @@ public class TestVectorizedORCReader {
private void checkVectorizedReader() throws Exception {
- Reader vreader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath, conf);
- Reader reader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath, conf);
- RecordReaderImpl vrr = (RecordReaderImpl) vreader.rows(null);
- RecordReaderImpl rr = (RecordReaderImpl) reader.rows(null);
+ Reader vreader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf));
+ Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf));
+ RecordReaderImpl vrr = (RecordReaderImpl) vreader.rows();
+ RecordReaderImpl rr = (RecordReaderImpl) reader.rows();
VectorizedRowBatch batch = null;
OrcStruct row = null;
@@ -142,7 +144,7 @@ public class TestVectorizedORCReader {
while (vrr.hasNext()) {
batch = vrr.nextBatch(batch);
for (int i = 0; i < batch.size; i++) {
- row = (OrcStruct) rr.next((Object) row);
+ row = (OrcStruct) rr.next(row);
for (int j = 0; j < batch.cols.length; j++) {
Object a = (row.getFieldValue(j));
Object b = batch.cols[j].getWritableObject(i);
Modified: hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Wed Mar 26 18:14:37 2014
@@ -28,6 +28,7 @@ import java.net.URL;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -43,6 +44,7 @@ import javax.security.auth.login.LoginEx
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -617,29 +619,11 @@ public class Hadoop20Shims implements Ha
}
@Override
- public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
+ public List<FileStatus> listLocatedStatus(final FileSystem fs,
final Path path,
final PathFilter filter
) throws IOException {
- return new Iterator<FileStatus>() {
- private final FileStatus[] result = fs.listStatus(path, filter);
- private int current = 0;
-
- @Override
- public boolean hasNext() {
- return current < result.length;
- }
-
- @Override
- public FileStatus next() {
- return result[current++];
- }
-
- @Override
- public void remove() {
- throw new IllegalArgumentException("Not supported");
- }
- };
+ return Arrays.asList(fs.listStatus(path, filter));
}
@Override
@@ -649,6 +633,11 @@ public class Hadoop20Shims implements Ha
}
@Override
+ public void hflush(FSDataOutputStream stream) throws IOException {
+ stream.sync();
+ }
+
+ @Override
public void authorizeProxyAccess(String proxyUser, UserGroupInformation realUserUgi,
String ipAddress, Configuration conf) throws IOException {
// This hadoop version doesn't have proxy verification
Modified: hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Wed Mar 26 18:14:37 2014
@@ -21,16 +21,19 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.net.URI;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -367,29 +370,11 @@ public class Hadoop20SShims extends Hado
}
@Override
- public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
- final Path path,
- final PathFilter filter
- ) throws IOException {
- return new Iterator<FileStatus>() {
- private final FileStatus[] result = fs.listStatus(path, filter);
- private int current = 0;
-
- @Override
- public boolean hasNext() {
- return current < result.length;
- }
-
- @Override
- public FileStatus next() {
- return result[current++];
- }
-
- @Override
- public void remove() {
- throw new IllegalArgumentException("Not supported");
- }
- };
+ public List<FileStatus> listLocatedStatus(final FileSystem fs,
+ final Path path,
+ final PathFilter filter
+ ) throws IOException {
+ return Arrays.asList(fs.listStatus(path, filter));
}
@Override
@@ -399,6 +384,11 @@ public class Hadoop20SShims extends Hado
}
@Override
+ public void hflush(FSDataOutputStream stream) throws IOException {
+ stream.sync();
+ }
+
+ @Override
public FileSystem createProxyFileSystem(FileSystem fs, URI uri) {
return new ProxyFileSystem(fs, uri);
}
Modified: hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Wed Mar 26 18:14:37 2014
@@ -22,8 +22,10 @@ import java.lang.Integer;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.net.URI;
@@ -34,6 +36,7 @@ import org.apache.commons.lang.StringUti
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -465,51 +468,19 @@ public class Hadoop23Shims extends Hadoo
}
@Override
- public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
- final Path path,
- final PathFilter filter
- ) throws IOException {
- return new Iterator<FileStatus>() {
- private final RemoteIterator<LocatedFileStatus> inner =
- fs.listLocatedStatus(path);
- private FileStatus next;
- {
- next = null;
- while (inner.hasNext() && next == null) {
- next = inner.next();
- if (filter != null && !filter.accept(next.getPath())) {
- next = null;
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- return next != null;
- }
-
- @Override
- public FileStatus next() {
- FileStatus result = next;
- next = null;
- try {
- while (inner.hasNext() && next == null) {
- next = inner.next();
- if (filter != null && !filter.accept(next.getPath())) {
- next = null;
- }
- }
- } catch (IOException ioe) {
- throw new IllegalArgumentException("Iterator exception", ioe);
- }
- return result;
+ public List<FileStatus> listLocatedStatus(final FileSystem fs,
+ final Path path,
+ final PathFilter filter
+ ) throws IOException {
+ RemoteIterator<LocatedFileStatus> itr = fs.listLocatedStatus(path);
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ while(itr.hasNext()) {
+ FileStatus stat = itr.next();
+ if (filter == null || filter.accept(stat.getPath())) {
+ result.add(stat);
}
-
- @Override
- public void remove() {
- throw new IllegalArgumentException("Not supported");
- }
- };
+ }
+ return result;
}
@Override
@@ -522,6 +493,11 @@ public class Hadoop23Shims extends Hadoo
}
}
+ @Override
+ public void hflush(FSDataOutputStream stream) throws IOException {
+ stream.hflush();
+ }
+
class ProxyFileSystem23 extends ProxyFileSystem {
public ProxyFileSystem23(FileSystem fs) {
super(fs);
Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Wed Mar 26 18:14:37 2014
@@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -448,11 +449,11 @@ public interface HadoopShims {
* @param fs the file system
* @param path the directory name to get the status and block locations
* @param filter a filter that needs to accept the file (or null)
- * @return an iterator for the located file status objects
+ * @return an list for the located file status objects
* @throws IOException
*/
- Iterator<FileStatus> listLocatedStatus(FileSystem fs, Path path,
- PathFilter filter) throws IOException;
+ List<FileStatus> listLocatedStatus(FileSystem fs, Path path,
+ PathFilter filter) throws IOException;
/**
* For file status returned by listLocatedStatus, convert them into a list
@@ -465,6 +466,13 @@ public interface HadoopShims {
BlockLocation[] getLocations(FileSystem fs,
FileStatus status) throws IOException;
+ /**
+ * Flush and make visible to other users the changes to the given stream.
+ * @param stream the stream to hflush.
+ * @throws IOException
+ */
+ public void hflush(FSDataOutputStream stream) throws IOException;
+
public HCatHadoopShims getHCatShim();
public interface HCatHadoopShims {