You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2015/09/17 03:15:42 UTC
hive git commit: HIVE-11583 : When PTF is used over a large
partitions result could be corrupted (Illya Yalovyy via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 7be02aec7 -> 8d524e062
HIVE-11583 : When PTF is used over a large partitions result could be corrupted (Illya Yalovyy via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d524e06
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d524e06
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d524e06
Branch: refs/heads/master
Commit: 8d524e062e6a8ad8c592e6067cea254c054797cd
Parents: 7be02ae
Author: Illya Yalovyy <ya...@amazon.com>
Authored: Mon Sep 14 10:18:00 2015 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed Sep 16 18:15:08 2015 -0700
----------------------------------------------------------------------
.../ql/exec/persistence/PTFRowContainer.java | 14 +++++----
.../hive/ql/exec/persistence/RowContainer.java | 12 +++++---
.../exec/persistence/TestPTFRowContainer.java | 31 ++++++++++++++------
3 files changed, 38 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8d524e06/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
index d2bfea6..61cc6e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
@@ -81,8 +81,8 @@ import org.apache.hadoop.util.Progressable;
*/
public class PTFRowContainer<Row extends List<Object>> extends RowContainer<Row> {
- ArrayList<BlockInfo> blockInfos;
- int currentReadBlockStartRow;
+ private ArrayList<BlockInfo> blockInfos;
+ private int currentReadBlockStartRow;
public PTFRowContainer(int bs, Configuration jc, Reporter reporter
) throws HiveException {
@@ -190,14 +190,16 @@ public class PTFRowContainer<Row extends List<Object>> extends RowContainer<Row>
BlockInfo bI = blockInfos.get(blockNum);
int startSplit = bI.startingSplit;
- int endSplit = startSplit;
- if ( blockNum != blockInfos.size() - 1) {
- endSplit = blockInfos.get(blockNum+1).startingSplit;
+ int endSplit;
+ if ( blockNum != blockInfos.size() - 1 ) {
+ endSplit = blockInfos.get(blockNum + 1).startingSplit;
+ } else {
+ endSplit = getLastActualSplit();
}
try {
int readIntoOffset = 0;
- for(int i = startSplit; i <= endSplit; i++ ) {
+ for(int i = startSplit; i <= endSplit && readIntoOffset < getBlockSize(); i++ ) {
org.apache.hadoop.mapred.RecordReader rr = setReaderAtSplit(i);
if ( i == startSplit ) {
((PTFSequenceFileRecordReader)rr).seek(bI.startOffset);
http://git-wip-us.apache.org/repos/asf/hive/blob/8d524e06/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
index 4252bd1..68dc482 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
@@ -103,7 +103,7 @@ public class RowContainer<ROW extends List<Object>>
boolean firstCalled = false; // once called first, it will never be able to
// write again.
- int acutalSplitNum = 0;
+ private int actualSplitNum = 0;
int currentSplitPointer = 0;
org.apache.hadoop.mapred.RecordReader rr = null; // record reader
RecordWriter rw = null;
@@ -220,7 +220,7 @@ public class RowContainer<ROW extends List<Object>>
HiveConf.setVar(localJc, HiveConf.ConfVars.HADOOPMAPREDINPUTDIR,
org.apache.hadoop.util.StringUtils.escapeString(parentFile.getAbsolutePath()));
inputSplits = inputFormat.getSplits(localJc, 1);
- acutalSplitNum = inputSplits.length;
+ actualSplitNum = inputSplits.length;
}
currentSplitPointer = 0;
rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer],
@@ -375,7 +375,7 @@ public class RowContainer<ROW extends List<Object>>
}
}
- if (nextSplit && this.currentSplitPointer < this.acutalSplitNum) {
+ if (nextSplit && this.currentSplitPointer < this.actualSplitNum) {
JobConf localJc = getLocalFSJobConfClone(jc);
// open record reader to read next split
rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
@@ -421,7 +421,7 @@ public class RowContainer<ROW extends List<Object>>
addCursor = 0;
numFlushedBlocks = 0;
this.readBlockSize = 0;
- this.acutalSplitNum = 0;
+ this.actualSplitNum = 0;
this.currentSplitPointer = -1;
this.firstCalled = false;
this.inputSplits = null;
@@ -606,4 +606,8 @@ public class RowContainer<ROW extends List<Object>>
clearRows();
currentReadBlock = firstReadBlockPointer = currentWriteBlock = null;
}
+
+ protected int getLastActualSplit() {
+ return actualSplitNum - 1;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8d524e06/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java
index a404ff0..0611072 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.hive.ql.exec.persistence;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -37,11 +39,13 @@ import org.apache.hadoop.io.Text;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
@SuppressWarnings("deprecation")
public class TestPTFRowContainer {
- private static final String COL_NAMES = "x,y,z,a,b";
- private static final String COL_TYPES = "int,string,double,int,string";
+ private static final String COL_NAMES = "x,y,z,a,b,v";
+ private static final String COL_TYPES = "int,string,double,int,string,string";
static SerDe serDe;
static Configuration cfg;
@@ -70,7 +74,7 @@ public class TestPTFRowContainer {
return rc;
}
- private void runTest(int sz, int blockSize) throws SerDeException, HiveException {
+ private void runTest(int sz, int blockSize, String value) throws SerDeException, HiveException {
List<Object> row;
PTFRowContainer<List<Object>> rc = rowContainer(blockSize);
@@ -82,16 +86,17 @@ public class TestPTFRowContainer {
row.add(new DoubleWritable(i));
row.add(new IntWritable(i));
row.add(new Text("def " + i));
+ row.add(new Text(value));
rc.addRow(row);
}
// test forward scan
- assert(rc.rowCount() == sz);
+ assertEquals(sz, rc.rowCount());
i = 0;
row = new ArrayList<Object>();
row = rc.first();
while(row != null ) {
- assert(row.get(1).toString().equals("abc " + i));
+ assertEquals("abc " + i, row.get(1).toString());
i++;
row = rc.next();
}
@@ -100,7 +105,7 @@ public class TestPTFRowContainer {
row = rc.first();
for(i = sz - 1; i >= 0; i-- ) {
row = rc.getAt(i);
- assert(row.get(1).toString().equals("abc " + i));
+ assertEquals("abc " + i, row.get(1).toString());
}
Random r = new Random(1000L);
@@ -109,20 +114,23 @@ public class TestPTFRowContainer {
for(i=0; i < 100; i++) {
int j = r.nextInt(sz);
row = rc.getAt(j);
- assert(row.get(1).toString().equals("abc " + j));
+ assertEquals("abc " + j, row.get(1).toString());
}
// intersperse getAt and next calls
for(i=0; i < 100; i++) {
int j = r.nextInt(sz);
row = rc.getAt(j);
- assert(row.get(1).toString().equals("abc " + j));
+ assertEquals("abc " + j, row.get(1).toString());
for(int k = j + 1; k < j + (blockSize/4) && k < sz; k++) {
row = rc.next();
- assert(row.get(4).toString().equals("def " + k));
+ assertEquals("def " + k, row.get(4).toString());
}
}
+ }
+ private void runTest(int sz, int blockSize) throws SerDeException, HiveException {
+ runTest(sz, blockSize, "");
}
@Test
@@ -134,4 +142,9 @@ public class TestPTFRowContainer {
public void testSmallBlockSize() throws SerDeException, HiveException {
runTest(10 * 1000, 5);
}
+
+ @Test
+ public void testBlocksLargerThanSplit() throws SerDeException, HiveException, IOException {
+ runTest(5, 2, new String(new char[(int)FileSystem.getLocal(cfg).getDefaultBlockSize()]));
+ }
}