You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/17 01:56:10 UTC
svn commit: r1632448 - in /hive/branches/spark/ql/src:
java/org/apache/hadoop/hive/ql/exec/spark/
test/org/apache/hadoop/hive/ql/exec/spark/
Author: xuefu
Date: Thu Oct 16 23:56:09 2014
New Revision: 1632448
URL: http://svn.apache.org/r1632448
Log:
HIVE-7873: Re-enable lazy HiveBaseFunctionResultList [Spark Branch] (Jimmy via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java?rev=1632448&r1=1632447&r2=1632448&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java Thu Oct 16 23:56:09 2014
@@ -17,19 +17,19 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.OutputCollector;
+
import scala.Tuple2;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
+import com.google.common.base.Preconditions;
/**
* Base class for
@@ -38,9 +38,10 @@ import java.util.NoSuchElementException;
* are processed in lazy fashion i.e when output records are requested
* through Iterator interface.
*/
+@SuppressWarnings("rawtypes")
public abstract class HiveBaseFunctionResultList<T> implements
Iterable, OutputCollector<HiveKey, BytesWritable>, Serializable {
-
+ private static final long serialVersionUID = -1L;
private final Iterator<T> inputIterator;
private boolean isClosed = false;
@@ -106,11 +107,9 @@ public abstract class HiveBaseFunctionRe
while (inputIterator.hasNext() && !processingDone()) {
try {
processNextRecord(inputIterator.next());
- // TODO Current HiveKVResultCache does not support read-then-write,
- // should not enable lazy execution here. See HIVE-7873
- // if (lastRecordOutput.hasNext()) {
- // return true;
- // }
+ if (lastRecordOutput.hasNext()) {
+ return true;
+ }
} catch (IOException ex) {
// TODO: better handling of exception.
throw new RuntimeException("Error while processing input.", ex);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java?rev=1632448&r1=1632447&r2=1632448&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java Thu Oct 16 23:56:09 2014
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
@@ -32,14 +34,17 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.Reporter;
+
import scala.Tuple2;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.base.Preconditions;
/**
* Wrapper around {@link org.apache.hadoop.hive.ql.exec.persistence.RowContainer}
+ *
+ * This class is thread safe.
*/
+@SuppressWarnings({"deprecation", "unchecked", "rawtypes"})
public class HiveKVResultCache {
public static final int IN_MEMORY_CACHE_SIZE = 512;
@@ -47,14 +52,20 @@ public class HiveKVResultCache {
private static final String COL_TYPES =
serdeConstants.BINARY_TYPE_NAME + ":" + serdeConstants.BINARY_TYPE_NAME;
+ // Used to cache rows added while container is iterated.
+ private RowContainer backupContainer;
+
private RowContainer container;
+ private Configuration conf;
private int cursor = 0;
public HiveKVResultCache(Configuration conf) {
- initRowContainer(conf);
+ container = initRowContainer(conf);
+ this.conf = conf;
}
- private void initRowContainer(Configuration conf) {
+ private static RowContainer initRowContainer(Configuration conf) {
+ RowContainer container;
try {
container = new RowContainer(IN_MEMORY_CACHE_SIZE, conf, Reporter.NULL);
@@ -71,6 +82,7 @@ public class HiveKVResultCache {
} catch(Exception ex) {
throw new RuntimeException("Failed to create RowContainer", ex);
}
+ return container;
}
public void add(HiveKey key, BytesWritable value) {
@@ -80,14 +92,26 @@ public class HiveKVResultCache {
row.add(wrappedHiveKey);
row.add(value);
- try {
- container.addRow(row);
- } catch (HiveException ex) {
- throw new RuntimeException("Failed to add KV pair to RowContainer", ex);
+ synchronized (this) {
+ try {
+ if (cursor == 0) {
+ container.addRow(row);
+ } else {
+ if (backupContainer == null) {
+ backupContainer = initRowContainer(conf);
+ }
+ backupContainer.addRow(row);
+ }
+ } catch (HiveException ex) {
+ throw new RuntimeException("Failed to add KV pair to RowContainer", ex);
+ }
}
}
- public void clear() {
+ public synchronized void clear() {
+ if (cursor == 0) {
+ return;
+ }
try {
container.clearRows();
} catch(HiveException ex) {
@@ -96,21 +120,34 @@ public class HiveKVResultCache {
cursor = 0;
}
- public boolean hasNext() {
- return container.rowCount() > 0 && cursor < container.rowCount();
+ public synchronized boolean hasNext() {
+ if (container.rowCount() > 0 && cursor < container.rowCount()) {
+ return true;
+ }
+ if (backupContainer == null
+ || backupContainer.rowCount() == 0) {
+ return false;
+ }
+ clear();
+ // Switch containers
+ RowContainer tmp = container;
+ container = backupContainer;
+ backupContainer = tmp;
+ return true;
}
public Tuple2<HiveKey, BytesWritable> next() {
- Preconditions.checkState(hasNext());
-
try {
List<BytesWritable> row;
- if (cursor == 0) {
- row = container.first();
- } else {
- row = container.next();
+ synchronized (this) {
+ Preconditions.checkState(hasNext());
+ if (cursor == 0) {
+ row = container.first();
+ } else {
+ row = container.next();
+ }
+ cursor++;
}
- cursor++;
HiveKey key = KryoSerializer.deserialize(row.get(0).getBytes(), HiveKey.class);
return new Tuple2<HiveKey, BytesWritable>(key, row.get(1));
} catch (HiveException ex) {
Modified: hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java?rev=1632448&r1=1632447&r2=1632448&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java (original)
+++ hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java Thu Oct 16 23:56:09 2014
@@ -17,14 +17,27 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.junit.Test;
+
import scala.Tuple2;
-import static org.junit.Assert.assertTrue;
+import com.clearspring.analytics.util.Preconditions;
+@SuppressWarnings({"unchecked", "rawtypes"})
public class TestHiveKVResultCache {
@Test
public void testSimple() throws Exception {
@@ -87,4 +100,237 @@ public class TestHiveKVResultCache {
cache.clear();
}
-}
\ No newline at end of file
+
+ @Test
+ public void testResultList() throws Exception {
+ scanAndVerify(10000, 0, 0, "a", "b");
+ scanAndVerify(10000, 512, 0, "a", "b");
+ scanAndVerify(10000, 512 * 2, 0, "a", "b");
+ scanAndVerify(10000, 512, 10, "a", "b");
+ scanAndVerify(10000, 512 * 2, 10, "a", "b");
+ }
+
+ private static void scanAndVerify(
+ long rows, int threshold, int separate, String prefix1, String prefix2) {
+ ArrayList<Tuple2<HiveKey, BytesWritable>> output =
+ new ArrayList<Tuple2<HiveKey, BytesWritable>>((int)rows);
+ scanResultList(rows, threshold, separate, output, prefix1, prefix2);
+ assertEquals(rows, output.size());
+ long primaryRows = rows * (100 - separate) / 100;
+ long separateRows = rows - primaryRows;
+ HashSet<Long> primaryRowKeys = new HashSet<Long>();
+ HashSet<Long> separateRowKeys = new HashSet<Long>();
+ for (Tuple2<HiveKey, BytesWritable> item: output) {
+ String key = new String(item._1.copyBytes());
+ String value = new String(item._2.copyBytes());
+ String prefix = key.substring(0, key.indexOf('_'));
+ Long id = Long.valueOf(key.substring(5 + prefix.length()));
+ if (prefix.equals(prefix1)) {
+ assertTrue(id >= 0 && id < primaryRows);
+ primaryRowKeys.add(id);
+ } else {
+ assertEquals(prefix2, prefix);
+ assertTrue(id >= 0 && id < separateRows);
+ separateRowKeys.add(id);
+ }
+ assertEquals(prefix + "_value_" + id, value);
+ }
+ assertEquals(separateRows, separateRowKeys.size());
+ assertEquals(primaryRows, primaryRowKeys.size());
+ }
+
+ private static class MyHiveFunctionResultList extends HiveBaseFunctionResultList {
+ private static final long serialVersionUID = -1L;
+
+ // Total rows to emit during the whole iteration,
+ // excluding the rows emitted by the separate thread.
+ private long primaryRows;
+ // Batch of rows to emit per processNextRecord() call.
+ private int thresholdRows;
+ // Rows to be emitted with a separate thread per processNextRecord() call.
+ private long separateRows;
+ // Thread to generate the separate rows beside the normal thread.
+ private Thread separateRowGenerator;
+
+ // Counter for rows emitted
+ private long rowsEmitted;
+ private long separateRowsEmitted;
+
+ // Prefix for primary row keys
+ private String prefix1;
+ // Prefix for separate row keys
+ private String prefix2;
+
+ // A queue to notify separateRowGenerator to generate the next batch of rows.
+ private LinkedBlockingQueue<Boolean> queue;
+
+ MyHiveFunctionResultList(Configuration conf, Iterator inputIterator) {
+ super(conf, inputIterator);
+ }
+
+ void init(long rows, int threshold, int separate, String p1, String p2) {
+ Preconditions.checkArgument((threshold > 0 || separate == 0)
+ && separate < 100 && separate >= 0 && rows > 0);
+ primaryRows = rows * (100 - separate) / 100;
+ separateRows = rows - primaryRows;
+ thresholdRows = threshold;
+ prefix1 = p1;
+ prefix2 = p2;
+ if (separateRows > 0) {
+ separateRowGenerator = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long separateBatchSize = thresholdRows * separateRows / primaryRows;
+ while (!queue.take().booleanValue()) {
+ for (int i = 0; i < separateBatchSize; i++) {
+ collect(prefix2, separateRowsEmitted++);
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for (; separateRowsEmitted < separateRows;) {
+ collect(prefix2, separateRowsEmitted++);
+ }
+ }
+ });
+ queue = new LinkedBlockingQueue<Boolean>();
+ separateRowGenerator.start();
+ }
+ }
+
+ public void collect(String prefix, long id) {
+ String k = prefix + "_key_" + id;
+ String v = prefix + "_value_" + id;
+ HiveKey key = new HiveKey(k.getBytes(), k.hashCode());
+ BytesWritable value = new BytesWritable(v.getBytes());
+ try {
+ collect(key, value);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void processNextRecord(Object inputRecord) throws IOException {
+ for (int i = 0; i < thresholdRows; i++) {
+ collect(prefix1, rowsEmitted++);
+ }
+ if (separateRowGenerator != null) {
+ queue.add(Boolean.FALSE);
+ }
+ }
+
+ @Override
+ protected boolean processingDone() {
+ return false;
+ }
+
+ @Override
+ protected void closeRecordProcessor() {
+ for (; rowsEmitted < primaryRows;) {
+ collect(prefix1, rowsEmitted++);
+ }
+ if (separateRowGenerator != null) {
+ queue.add(Boolean.TRUE);
+ try {
+ separateRowGenerator.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private static long scanResultList(long rows, int threshold, int separate,
+ List<Tuple2<HiveKey, BytesWritable>> output, String prefix1, String prefix2) {
+ final long iteratorCount = threshold == 0 ? 1 : rows * (100 - separate) / 100 / threshold;
+ MyHiveFunctionResultList resultList = new MyHiveFunctionResultList(
+ new HiveConf(), new Iterator() {
+ // Input record iterator, not used
+ private int i = 0;
+ @Override
+ public boolean hasNext() {
+ return i++ < iteratorCount;
+ }
+
+ @Override
+ public Object next() {
+ return Integer.valueOf(i);
+ }
+
+ @Override
+ public void remove() {
+ }
+ });
+
+ resultList.init(rows, threshold, separate, prefix1, prefix2);
+ long startTime = System.currentTimeMillis();
+ Iterator it = resultList.iterator();
+ while (it.hasNext()) {
+ Object item = it.next();
+ if (output != null) {
+ output.add((Tuple2<HiveKey, BytesWritable>)item);
+ }
+ }
+ long endTime = System.currentTimeMillis();
+ return endTime - startTime;
+ }
+
+ private static long[] scanResultList(long rows, int threshold, int extra) {
+ // 1. Simulate emitting all records in closeRecordProcessor().
+ long t1 = scanResultList(rows, 0, 0, null, "a", "b");
+
+ // 2. Simulate emitting records in processNextRecord() with small memory usage limit.
+ long t2 = scanResultList(rows, threshold, 0, null, "c", "d");
+
+ // 3. Simulate emitting records in processNextRecord() with large memory usage limit.
+ long t3 = scanResultList(rows, threshold * 10, 0, null, "e", "f");
+
+ // 4. Same as 2. Also emit extra records from a separate thread.
+ long t4 = scanResultList(rows, threshold, extra, null, "g", "h");
+
+ // 5. Same as 3. Also emit extra records from a separate thread.
+ long t5 = scanResultList(rows, threshold * 10, extra, null, "i", "j");
+
+ return new long[] {t1, t2, t3, t4, t5};
+ }
+
+ public static void main(String[] args) throws Exception {
+ long rows = 1000000; // total rows to generate
+ int threshold = 512; // # of rows to cache at most
+ int extra = 5; // percentile of extra rows to generate by a different thread
+
+ if (args.length > 0) {
+ rows = Long.parseLong(args[0]);
+ }
+ if (args.length > 1) {
+ threshold = Integer.parseInt(args[1]);
+ }
+ if (args.length > 2) {
+ extra = Integer.parseInt(args[2]);
+ }
+
+ // Warm up couple times
+ for (int i = 0; i < 2; i++) {
+ scanResultList(rows, threshold, extra);
+ }
+
+ int count = 5;
+ long[] t = new long[count];
+ // Run count times and get average
+ for (int i = 0; i < count; i++) {
+ long[] tmp = scanResultList(rows, threshold, extra);
+ for (int k = 0; k < count; k++) {
+ t[k] += tmp[k];
+ }
+ }
+ for (int i = 0; i < count; i++) {
+ t[i] /= count;
+ }
+
+ System.out.println(t[0] + "\t" + t[1] + "\t" + t[2]
+ + "\t" + t[3] + "\t" + t[4]);
+ }
+}