You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/09/18 22:35:34 UTC

[29/41] hive git commit: HIVE-11583 : When PTF is used over a large partitions result could be corrupted (Illya Yalovyy via Ashutosh Chauhan)

HIVE-11583 : When PTF is used over a large partitions result could be corrupted (Illya Yalovyy via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d524e06
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d524e06
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d524e06

Branch: refs/heads/llap
Commit: 8d524e062e6a8ad8c592e6067cea254c054797cd
Parents: 7be02ae
Author: Illya Yalovyy <ya...@amazon.com>
Authored: Mon Sep 14 10:18:00 2015 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed Sep 16 18:15:08 2015 -0700

----------------------------------------------------------------------
 .../ql/exec/persistence/PTFRowContainer.java    | 14 +++++----
 .../hive/ql/exec/persistence/RowContainer.java  | 12 +++++---
 .../exec/persistence/TestPTFRowContainer.java   | 31 ++++++++++++++------
 3 files changed, 38 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8d524e06/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
index d2bfea6..61cc6e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
@@ -81,8 +81,8 @@ import org.apache.hadoop.util.Progressable;
  */
 public class PTFRowContainer<Row extends List<Object>> extends RowContainer<Row> {
 
-  ArrayList<BlockInfo> blockInfos;
-  int currentReadBlockStartRow;
+  private ArrayList<BlockInfo> blockInfos;
+  private int currentReadBlockStartRow;
 
   public PTFRowContainer(int bs, Configuration jc, Reporter reporter
       ) throws HiveException {
@@ -190,14 +190,16 @@ public class PTFRowContainer<Row extends List<Object>> extends RowContainer<Row>
 
     BlockInfo bI = blockInfos.get(blockNum);
     int startSplit = bI.startingSplit;
-    int endSplit = startSplit;
-    if ( blockNum != blockInfos.size() - 1) {
-      endSplit = blockInfos.get(blockNum+1).startingSplit;
+    int endSplit;
+    if ( blockNum != blockInfos.size() - 1 ) {
+      endSplit = blockInfos.get(blockNum + 1).startingSplit;
+    } else {
+      endSplit = getLastActualSplit();
     }
 
     try {
       int readIntoOffset = 0;
-      for(int i = startSplit; i <= endSplit; i++ ) {
+      for(int i = startSplit; i <= endSplit && readIntoOffset < getBlockSize(); i++ ) {
         org.apache.hadoop.mapred.RecordReader rr = setReaderAtSplit(i);
         if ( i == startSplit ) {
           ((PTFSequenceFileRecordReader)rr).seek(bI.startOffset);

http://git-wip-us.apache.org/repos/asf/hive/blob/8d524e06/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
index 4252bd1..68dc482 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
@@ -103,7 +103,7 @@ public class RowContainer<ROW extends List<Object>>
 
   boolean firstCalled = false; // once called first, it will never be able to
   // write again.
-  int acutalSplitNum = 0;
+  private int actualSplitNum = 0;
   int currentSplitPointer = 0;
   org.apache.hadoop.mapred.RecordReader rr = null; // record reader
   RecordWriter rw = null;
@@ -220,7 +220,7 @@ public class RowContainer<ROW extends List<Object>>
           HiveConf.setVar(localJc, HiveConf.ConfVars.HADOOPMAPREDINPUTDIR,
               org.apache.hadoop.util.StringUtils.escapeString(parentFile.getAbsolutePath()));
           inputSplits = inputFormat.getSplits(localJc, 1);
-          acutalSplitNum = inputSplits.length;
+          actualSplitNum = inputSplits.length;
         }
         currentSplitPointer = 0;
         rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer],
@@ -375,7 +375,7 @@ public class RowContainer<ROW extends List<Object>>
         }
       }
 
-      if (nextSplit && this.currentSplitPointer < this.acutalSplitNum) {
+      if (nextSplit && this.currentSplitPointer < this.actualSplitNum) {
         JobConf localJc = getLocalFSJobConfClone(jc);
         // open record reader to read next split
         rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
@@ -421,7 +421,7 @@ public class RowContainer<ROW extends List<Object>>
     addCursor = 0;
     numFlushedBlocks = 0;
     this.readBlockSize = 0;
-    this.acutalSplitNum = 0;
+    this.actualSplitNum = 0;
     this.currentSplitPointer = -1;
     this.firstCalled = false;
     this.inputSplits = null;
@@ -606,4 +606,8 @@ public class RowContainer<ROW extends List<Object>>
     clearRows();
     currentReadBlock = firstReadBlockPointer = currentWriteBlock = null;
   }
+
+  protected int getLastActualSplit() {
+    return actualSplitNum - 1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8d524e06/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java
index a404ff0..0611072 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.hive.ql.exec.persistence;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -37,11 +39,13 @@ import org.apache.hadoop.io.Text;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 @SuppressWarnings("deprecation")
 public class TestPTFRowContainer {
 
-  private static final String COL_NAMES = "x,y,z,a,b";
-  private static final String COL_TYPES = "int,string,double,int,string";
+  private static final String COL_NAMES = "x,y,z,a,b,v";
+  private static final String COL_TYPES = "int,string,double,int,string,string";
 
   static SerDe serDe;
   static Configuration cfg;
@@ -70,7 +74,7 @@ public class TestPTFRowContainer {
     return rc;
   }
 
-  private void runTest(int sz, int blockSize) throws SerDeException, HiveException {
+  private void runTest(int sz, int blockSize, String value) throws SerDeException, HiveException {
     List<Object> row;
 
     PTFRowContainer<List<Object>> rc = rowContainer(blockSize);
@@ -82,16 +86,17 @@ public class TestPTFRowContainer {
       row.add(new DoubleWritable(i));
       row.add(new IntWritable(i));
       row.add(new Text("def " + i));
+      row.add(new Text(value));
       rc.addRow(row);
     }
 
     // test forward scan
-    assert(rc.rowCount() == sz);
+    assertEquals(sz, rc.rowCount());
     i = 0;
     row = new ArrayList<Object>();
     row = rc.first();
     while(row != null ) {
-      assert(row.get(1).toString().equals("abc " + i));
+      assertEquals("abc " + i, row.get(1).toString());
       i++;
       row = rc.next();
     }
@@ -100,7 +105,7 @@ public class TestPTFRowContainer {
     row = rc.first();
     for(i = sz - 1; i >= 0; i-- ) {
       row = rc.getAt(i);
-      assert(row.get(1).toString().equals("abc " + i));
+      assertEquals("abc " + i, row.get(1).toString());
     }
 
     Random r = new Random(1000L);
@@ -109,20 +114,23 @@ public class TestPTFRowContainer {
     for(i=0; i < 100; i++) {
       int j = r.nextInt(sz);
       row = rc.getAt(j);
-      assert(row.get(1).toString().equals("abc " + j));
+      assertEquals("abc " + j, row.get(1).toString());
     }
 
     // intersperse getAt and next calls
     for(i=0; i < 100; i++) {
       int j = r.nextInt(sz);
       row = rc.getAt(j);
-      assert(row.get(1).toString().equals("abc " + j));
+      assertEquals("abc " + j, row.get(1).toString());
       for(int k = j + 1; k < j + (blockSize/4) && k < sz; k++) {
         row = rc.next();
-        assert(row.get(4).toString().equals("def " + k));
+        assertEquals("def " + k, row.get(4).toString());
       }
     }
+  }
 
+  private void runTest(int sz, int blockSize) throws SerDeException, HiveException {
+    runTest(sz, blockSize, "");
   }
 
   @Test
@@ -134,4 +142,9 @@ public class TestPTFRowContainer {
   public void testSmallBlockSize() throws SerDeException, HiveException {
     runTest(10 * 1000, 5);
   }
+
+  @Test
+  public void testBlocksLargerThanSplit() throws SerDeException, HiveException, IOException {
+    runTest(5, 2, new String(new char[(int)FileSystem.getLocal(cfg).getDefaultBlockSize()]));
+  }
 }