You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/10/05 20:07:04 UTC
hive git commit: HIVE-11983 - Hive streaming API uses incorrect logic
to assign buckets to incoming records (Roshan Naik via Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master 8964c1ebc -> 0ca9ff865
HIVE-11983 - Hive streaming API uses incorrect logic to assign buckets to incoming records (Roshan Naik via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ca9ff86
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ca9ff86
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ca9ff86
Branch: refs/heads/master
Commit: 0ca9ff86514f5e76cb0cd99022d8d5d1cf39e626
Parents: 8964c1e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Oct 5 11:06:58 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Oct 5 11:06:58 2015 -0700
----------------------------------------------------------------------
hcatalog/streaming/pom.xml | 7 +
.../streaming/AbstractRecordWriter.java | 93 ++-
.../streaming/DelimitedInputWriter.java | 54 +-
.../hcatalog/streaming/StrictJsonWriter.java | 46 +-
.../hive/hcatalog/streaming/TestStreaming.java | 698 +++++++++++++++----
.../objectinspector/ObjectInspectorUtils.java | 29 +
6 files changed, 773 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/hcatalog/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/pom.xml b/hcatalog/streaming/pom.xml
index 6d03ce1..ba9f731 100644
--- a/hcatalog/streaming/pom.xml
+++ b/hcatalog/streaming/pom.xml
@@ -104,6 +104,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <scope>test</scope>
+ <version>${hadoop-23.version}</version>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index c959222..a2cd2f5 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -32,13 +33,20 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.thrift.TException;
import java.io.IOException;
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
abstract class AbstractRecordWriter implements RecordWriter {
static final private Log LOG = LogFactory.getLog(AbstractRecordWriter.class.getName());
@@ -48,14 +56,15 @@ abstract class AbstractRecordWriter implements RecordWriter {
final Table tbl;
final IMetaStoreClient msClient;
- RecordUpdater updater = null;
+ protected final List<Integer> bucketIds;
+ ArrayList<RecordUpdater> updaters = null;
+
+ public final int totalBuckets;
- private final int totalBuckets;
- private Random rand = new Random();
- private int currentBucketId = 0;
private final Path partitionPath;
final AcidOutputFormat<?,?> outf;
+ private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
throws ConnectionError, StreamingException {
@@ -71,8 +80,11 @@ abstract class AbstractRecordWriter implements RecordWriter {
throw new StreamingException("Cannot stream to table that has not been bucketed : "
+ endPoint);
}
+ this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()) ;
+ this.bucketFieldData = new Object[bucketIds.size()];
String outFormatName = this.tbl.getSd().getOutputFormat();
outf = (AcidOutputFormat<?,?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
+ bucketFieldData = new Object[bucketIds.size()];
} catch (MetaException e) {
throw new ConnectionError(endPoint, e);
} catch (NoSuchObjectException e) {
@@ -86,17 +98,37 @@ abstract class AbstractRecordWriter implements RecordWriter {
}
}
- protected AbstractRecordWriter(HiveEndPoint endPoint)
- throws ConnectionError, StreamingException {
- this(endPoint, HiveEndPoint.createHiveConf(AbstractRecordWriter.class, endPoint.metaStoreUri) );
+ // return the column numbers of the bucketed columns
+ private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
+ ArrayList<Integer> result = new ArrayList<Integer>(bucketCols.size());
+ HashSet<String> bucketSet = new HashSet<String>(bucketCols);
+ for (int i = 0; i < cols.size(); i++) {
+ if( bucketSet.contains(cols.get(i).getName()) ) {
+ result.add(i);
+ }
+ }
+ return result;
}
abstract SerDe getSerde() throws SerializationError;
+ protected abstract ObjectInspector[] getBucketObjectInspectors();
+ protected abstract StructObjectInspector getRecordObjectInspector();
+ protected abstract StructField[] getBucketStructFields();
+
+ // returns the bucket number to which the record belongs to
+ protected int getBucket(Object row) throws SerializationError {
+ ObjectInspector[] inspectors = getBucketObjectInspectors();
+ Object[] bucketFields = getBucketFields(row);
+ return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
+ }
+
@Override
public void flush() throws StreamingIOFailure {
try {
- updater.flush();
+ for (RecordUpdater updater : updaters) {
+ updater.flush();
+ }
} catch (IOException e) {
throw new StreamingIOFailure("Unable to flush recordUpdater", e);
}
@@ -116,9 +148,8 @@ abstract class AbstractRecordWriter implements RecordWriter {
public void newBatch(Long minTxnId, Long maxTxnID)
throws StreamingIOFailure, SerializationError {
try {
- this.currentBucketId = rand.nextInt(totalBuckets);
LOG.debug("Creating Record updater");
- updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID);
+ updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID);
} catch (IOException e) {
LOG.error("Failed creating record updater", e);
throw new StreamingIOFailure("Unable to get new record Updater", e);
@@ -128,13 +159,49 @@ abstract class AbstractRecordWriter implements RecordWriter {
@Override
public void closeBatch() throws StreamingIOFailure {
try {
- updater.close(false);
- updater = null;
+ for (RecordUpdater updater : updaters) {
+ updater.close(false);
+ }
+ updaters.clear();
} catch (IOException e) {
throw new StreamingIOFailure("Unable to close recordUpdater", e);
}
}
+ protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds
+ , StructObjectInspector recordObjInspector)
+ throws SerializationError {
+ ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
+
+ for (int i = 0; i < bucketIds.size(); i++) {
+ int bucketId = bucketIds.get(i);
+ result[i] =
+ recordObjInspector.getAllStructFieldRefs().get( bucketId ).getFieldObjectInspector();
+ }
+ return result;
+ }
+
+
+ private Object[] getBucketFields(Object row) throws SerializationError {
+ StructObjectInspector recordObjInspector = getRecordObjectInspector();
+ StructField[] bucketStructFields = getBucketStructFields();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketFieldData[i] = recordObjInspector.getStructFieldData(row, bucketStructFields[i]);
+ }
+ return bucketFieldData;
+ }
+
+
+
+ private ArrayList<RecordUpdater> createRecordUpdaters(int bucketCount, Long minTxnId, Long maxTxnID)
+ throws IOException, SerializationError {
+ ArrayList<RecordUpdater> result = new ArrayList<RecordUpdater>(bucketCount);
+ for (int bucket = 0; bucket < bucketCount; bucket++) {
+ result.add(createRecordUpdater(bucket, minTxnId, maxTxnID) );
+ }
+ return result;
+ }
+
private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
throws IOException, SerializationError {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
index 6dc69f0..fd36a38 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
@@ -26,12 +26,14 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.io.BytesWritable;
import java.io.IOException;
@@ -51,7 +53,11 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
private char serdeSeparator;
private int[] fieldToColMapping;
private final ArrayList<String> tableColumns;
- private AbstractSerDe serde = null;
+ private LazySimpleSerDe serde = null;
+
+ private final LazySimpleStructObjectInspector recordObjInspector;
+ private final ObjectInspector[] bucketObjInspectors;
+ private final StructField[] bucketStructFields;
static final private Log LOG = LogFactory.getLog(DelimitedInputWriter.class.getName());
@@ -120,6 +126,22 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint);
this.serdeSeparator = serdeSeparator;
+ this.serde = createSerde(tbl, conf, serdeSeparator);
+
+ // get ObjInspectors for entire record and bucketed cols
+ try {
+ this.recordObjInspector = (LazySimpleStructObjectInspector) serde.getObjectInspector();
+ this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+ }
+
+ // get StructFields for bucketed cols
+ bucketStructFields = new StructField[bucketIds.size()];
+ List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ }
}
private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) {
@@ -173,14 +195,14 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
}
String[] reorderedFields = new String[getTableColumns().size()];
String decoded = new String(record);
- String[] fields = decoded.split(delimiter);
+ String[] fields = decoded.split(delimiter,-1);
for (int i=0; i<fieldToColMapping.length; ++i) {
int newIndex = fieldToColMapping[i];
if(newIndex != -1) {
reorderedFields[newIndex] = fields[i];
}
}
- return join(reorderedFields,getSerdeSeparator());
+ return join(reorderedFields, getSerdeSeparator());
}
// handles nulls in items[]
@@ -212,7 +234,8 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
try {
byte[] orderedFields = reorderFields(record);
Object encodedRow = encode(orderedFields);
- updater.insert(transactionId, encodedRow);
+ int bucket = getBucket(encodedRow);
+ updaters.get(bucket).insert(transactionId, encodedRow);
} catch (IOException e) {
throw new StreamingIOFailure("Error writing record in transaction ("
+ transactionId + ")", e);
@@ -221,13 +244,22 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
@Override
SerDe getSerde() throws SerializationError {
- if(serde!=null) {
- return serde;
- }
- serde = createSerde(tbl, conf);
return serde;
}
+ protected LazySimpleStructObjectInspector getRecordObjectInspector() {
+ return recordObjInspector;
+ }
+
+ @Override
+ protected StructField[] getBucketStructFields() {
+ return bucketStructFields;
+ }
+
+ protected ObjectInspector[] getBucketObjectInspectors() {
+ return bucketObjInspectors;
+ }
+
private Object encode(byte[] record) throws SerializationError {
try {
BytesWritable blob = new BytesWritable();
@@ -244,7 +276,7 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
* @throws SerializationError if serde could not be initialized
* @param tbl
*/
- protected LazySimpleSerDe createSerde(Table tbl, HiveConf conf)
+ protected static LazySimpleSerDe createSerde(Table tbl, HiveConf conf, char serdeSeparator)
throws SerializationError {
try {
Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
index 6d6beb8..6ab21eb 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
@@ -24,10 +24,14 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.io.Text;
+import org.apache.hive.hcatalog.data.HCatRecordObjectInspector;
import org.apache.hive.hcatalog.data.JsonSerDe;
import java.io.IOException;
+import java.util.List;
import java.util.Properties;
/**
@@ -37,6 +41,10 @@ import java.util.Properties;
public class StrictJsonWriter extends AbstractRecordWriter {
private JsonSerDe serde;
+ private final HCatRecordObjectInspector recordObjInspector;
+ private final ObjectInspector[] bucketObjInspectors;
+ private final StructField[] bucketStructFields;
+
/**
*
* @param endPoint the end point to write to
@@ -46,7 +54,7 @@ public class StrictJsonWriter extends AbstractRecordWriter {
*/
public StrictJsonWriter(HiveEndPoint endPoint)
throws ConnectionError, SerializationError, StreamingException {
- super(endPoint, null);
+ this(endPoint, null);
}
/**
@@ -60,23 +68,49 @@ public class StrictJsonWriter extends AbstractRecordWriter {
public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf)
throws ConnectionError, SerializationError, StreamingException {
super(endPoint, conf);
+ this.serde = createSerde(tbl, conf);
+ // get ObjInspectors for entire record and bucketed cols
+ try {
+ recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector();
+ this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+ }
+
+ // get StructFields for bucketed cols
+ bucketStructFields = new StructField[bucketIds.size()];
+ List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ }
}
@Override
SerDe getSerde() throws SerializationError {
- if(serde!=null) {
- return serde;
- }
- serde = createSerde(tbl, conf);
return serde;
}
+ protected HCatRecordObjectInspector getRecordObjectInspector() {
+ return recordObjInspector;
+ }
+
+ @Override
+ protected StructField[] getBucketStructFields() {
+ return bucketStructFields;
+ }
+
+ protected ObjectInspector[] getBucketObjectInspectors() {
+ return bucketObjInspectors;
+ }
+
+
@Override
public void write(long transactionId, byte[] record)
throws StreamingIOFailure, SerializationError {
try {
Object encodedRow = encode(record);
- updater.insert(transactionId, encodedRow);
+ int bucket = getBucket(encodedRow);
+ updaters.get(bucket).insert(transactionId, encodedRow);
} catch (IOException e) {
throw new StreamingIOFailure("Error writing record in transaction("
+ transactionId + ")", e);
http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index c28d4aa..2f6baec 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -18,33 +18,38 @@
package org.apache.hive.hcatalog.streaming;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -68,7 +73,6 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -118,6 +122,7 @@ public class TestStreaming {
private static final String COL2 = "msg";
private final HiveConf conf;
+ private Driver driver;
private final IMetaStoreClient msClient;
final String metaStoreURI = null;
@@ -127,13 +132,23 @@ public class TestStreaming {
private final static String tblName = "alerts";
private final static String[] fieldNames = new String[]{COL1,COL2};
List<String> partitionVals;
- private static String partLocation;
+ private static Path partLoc;
+ private static Path partLoc2;
// unpartitioned table
- private final static String dbName2 = "testing";
+ private final static String dbName2 = "testing2";
private final static String tblName2 = "alerts";
private final static String[] fieldNames2 = new String[]{COL1,COL2};
+
+ // for bucket join testing
+ private final static String dbName3 = "testing3";
+ private final static String tblName3 = "dimensionTable";
+ private final static String dbName4 = "testing4";
+ private final static String tblName4 = "factTable";
+ List<String> partitionVals2;
+
+
private final String PART1_CONTINENT = "Asia";
private final String PART1_COUNTRY = "India";
@@ -146,14 +161,21 @@ public class TestStreaming {
partitionVals.add(PART1_CONTINENT);
partitionVals.add(PART1_COUNTRY);
+ partitionVals2 = new ArrayList<String>(1);
+ partitionVals2.add(PART1_COUNTRY);
+
+
conf = new HiveConf(this.getClass());
conf.set("fs.raw.impl", RawFileSystem.class.getName());
+ conf.set("hive.enforce.bucketing", "true");
TxnDbUtil.setConfValues(conf);
if (metaStoreURI!=null) {
conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
}
conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ dbFolder.create();
+
//1) Start from a clean slate (metastore)
TxnDbUtil.cleanDb();
@@ -165,17 +187,35 @@ public class TestStreaming {
@Before
public void setup() throws Exception {
+ SessionState.start(new CliSessionState(conf));
+ driver = new Driver(conf);
+ driver.setMaxRows(200002);//make sure Driver returns all results
// drop and recreate the necessary databases and tables
dropDB(msClient, dbName);
- createDbAndTable(msClient, dbName, tblName, partitionVals);
+
+ String[] colNames = new String[] {COL1, COL2};
+ String[] colTypes = new String[] {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
+ String[] bucketCols = new String[] {COL1};
+ String loc1 = dbFolder.newFolder(dbName + ".db").toString();
+ String[] partNames = new String[]{"Continent", "Country"};
+ partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, colNames, colTypes, bucketCols, partNames, loc1, 1);
dropDB(msClient, dbName2);
- createDbAndTable(msClient, dbName2, tblName2, partitionVals);
+ String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
+ partLoc2 = createDbAndTable(driver, dbName2, tblName2, partitionVals, colNames, colTypes, bucketCols, partNames, loc2, 2);
+
+ String loc3 = dbFolder.newFolder("testing5.db").toString();
+ createStoreSales("testing5", loc3);
+
+ runDDL(driver, "drop table testBucketing3.streamedtable");
+ runDDL(driver, "drop table testBucketing3.finaltable");
+ runDDL(driver, "drop table testBucketing3.nobucket");
}
@After
public void cleanup() throws Exception {
msClient.close();
+ driver.close();
}
private static List<FieldSchema> getPartitionKeys() {
@@ -186,10 +226,170 @@ public class TestStreaming {
return fields;
}
- private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpectedFiles,
+ private void createStoreSales(String dbName, String loc) throws Exception {
+ String dbUri = "raw://" + new Path(loc).toUri().toString();
+ String tableLoc = dbUri + Path.SEPARATOR + "store_sales";
+
+ boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'");
+ Assert.assertTrue(success);
+ success = runDDL(driver, "use " + dbName);
+ Assert.assertTrue(success);
+
+ success = runDDL(driver, "drop table if exists store_sales");
+ Assert.assertTrue(success);
+ success = runDDL(driver, "create table store_sales\n" +
+ "(\n" +
+ " ss_sold_date_sk int,\n" +
+ " ss_sold_time_sk int,\n" +
+ " ss_item_sk int,\n" +
+ " ss_customer_sk int,\n" +
+ " ss_cdemo_sk int,\n" +
+ " ss_hdemo_sk int,\n" +
+ " ss_addr_sk int,\n" +
+ " ss_store_sk int,\n" +
+ " ss_promo_sk int,\n" +
+ " ss_ticket_number int,\n" +
+ " ss_quantity int,\n" +
+ " ss_wholesale_cost decimal(7,2),\n" +
+ " ss_list_price decimal(7,2),\n" +
+ " ss_sales_price decimal(7,2),\n" +
+ " ss_ext_discount_amt decimal(7,2),\n" +
+ " ss_ext_sales_price decimal(7,2),\n" +
+ " ss_ext_wholesale_cost decimal(7,2),\n" +
+ " ss_ext_list_price decimal(7,2),\n" +
+ " ss_ext_tax decimal(7,2),\n" +
+ " ss_coupon_amt decimal(7,2),\n" +
+ " ss_net_paid decimal(7,2),\n" +
+ " ss_net_paid_inc_tax decimal(7,2),\n" +
+ " ss_net_profit decimal(7,2)\n" +
+ ")\n" +
+ " partitioned by (dt string)\n" +
+ "clustered by (ss_store_sk, ss_promo_sk)\n" +
+ "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc + "'" + " TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
+ Assert.assertTrue(success);
+
+ success = runDDL(driver, "alter table store_sales add partition(dt='2015')");
+ Assert.assertTrue(success);
+ }
+ /**
+ * make sure it works with table where bucket col is not 1st col
+ * @throws Exception
+ */
+ @Test
+ public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
+ List<String> partitionVals = new ArrayList<String>();
+ partitionVals.add("2015");
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
+ "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
+ "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
+ "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt);
+ StreamingConnection connection = endPt.newConnection(false, null);//should this really be null?
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+
+ StringBuilder row = new StringBuilder();
+ for(int i = 0; i < 10; i++) {
+ for(int ints = 0; ints < 11; ints++) {
+ row.append(ints).append(',');
+ }
+ for(int decs = 0; decs < 12; decs++) {
+ row.append(i + 0.1).append(',');
+ }
+ row.setLength(row.length() - 1);
+ txnBatch.write(row.toString().getBytes());
+ }
+ txnBatch.commit();
+ txnBatch.close();
+ connection.close();
+
+ ArrayList<String> res = queryTable(driver, "select row__id.bucketid, * from testing5.store_sales");
+ for (String re : res) {
+ System.out.println(re);
+ }
+ }
+
+
+ // stream data into streaming table with N buckets, then copy the data into another bucketed table
+ // check if bucketing in both was done in the same way
+ @Test
+ public void testStreamBucketingMatchesRegularBucketing() throws Exception {
+ int bucketCount = 100;
+
+ String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+ String tableLoc = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'";
+ String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'";
+ String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'";
+
+ runDDL(driver, "create database testBucketing3");
+ runDDL(driver, "use testBucketing3");
+ runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='true')") ;
+// In 'nobucket' table we capture bucketid from streamedtable to workaround a hive bug that prevents joins two identically bucketed tables
+ runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3) ;
+ runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')");
+
+
+ String[] records = new String[] {
+ "PSFAHYLZVC,29,EPNMA",
+ "PPPRKWAYAU,96,VUTEE",
+ "MIAOFERCHI,3,WBDSI",
+ "CEGQAZOWVN,0,WCUZL",
+ "XWAKMNSVQF,28,YJVHU",
+ "XBWTSAJWME,2,KDQFO",
+ "FUVLQTAXAY,5,LDSDG",
+ "QTQMDJMGJH,6,QBOMA",
+ "EFLOTLWJWN,71,GHWPS",
+ "PEQNAOJHCM,82,CAAFI",
+ "MOEKQLGZCP,41,RUACR",
+ "QZXMCOPTID,37,LFLWE",
+ "EYALVWICRD,13,JEZLC",
+ "VYWLZAYTXX,16,DMVZX",
+ "OSALYSQIXR,47,HNZVE",
+ "JGKVHKCEGQ,25,KSCJB",
+ "WQFMMYDHET,12,DTRWA",
+ "AJOVAYZKZQ,15,YBKFO",
+ "YAQONWCUAU,31,QJNHZ",
+ "DJBXUEUOEB,35,IYCBL"
+ };
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "streamedtable", null);
+ String[] colNames1 = new String[] { "key1", "key2", "data" };
+ DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr);
+ txnBatch.beginNextTransaction();
+
+ for (String record : records) {
+ txnBatch.write(record.toString().getBytes());
+ }
+
+ txnBatch.commit();
+ txnBatch.close();
+ connection.close();
+
+ ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2");
+ for (String re : res1) {
+ System.out.println(re);
+ }
+
+ driver.run("insert into nobucket select row__id.bucketid,* from streamedtable");
+ runDDL(driver, " insert into finaltable select * from nobucket");
+ ArrayList<String> res2 = queryTable(driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
+ for (String s : res2) {
+ LOG.error(s);
+ }
+ Assert.assertTrue(res2.isEmpty());
+ }
+
+
+ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
ValidTxnList txns = msClient.getValidTxns();
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -197,7 +397,7 @@ public class TestStreaming {
for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString());
Assert.assertEquals(numExpectedFiles, current.size());
- // find the absolute mininum transaction
+ // find the absolute minimum transaction
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
for (AcidUtils.ParsedDelta pd : current) {
@@ -209,11 +409,11 @@ public class TestStreaming {
InputFormat inf = new OrcInputFormat();
JobConf job = new JobConf();
- job.set("mapred.input.dir", partLocation.toString());
+ job.set("mapred.input.dir", partitionPath.toString());
job.set("bucket_count", Integer.toString(buckets));
job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
- InputSplit[] splits = inf.getSplits(job, 1);
- Assert.assertEquals(1, splits.length);
+ InputSplit[] splits = inf.getSplits(job, buckets);
+ Assert.assertEquals(buckets, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
inf.getRecordReader(splits[0], job, Reporter.NULL);
@@ -226,9 +426,9 @@ public class TestStreaming {
Assert.assertEquals(false, rr.next(key, value));
}
- private void checkNothingWritten() throws Exception {
+ private void checkNothingWritten(Path partitionPath) throws Exception {
ValidTxnList txns = msClient.getValidTxns();
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -398,7 +598,7 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -410,11 +610,11 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -459,7 +659,7 @@ public class TestStreaming {
txnBatch.write(rec1.getBytes());
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -518,7 +718,7 @@ public class TestStreaming {
, txnBatch.getCurrentTransactionState());
++batch;
}
- Assert.assertEquals(0,txnBatch.remainingTransactions());
+ Assert.assertEquals(0, txnBatch.remainingTransactions());
txnBatch.close();
Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
@@ -542,7 +742,7 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.abort();
- checkNothingWritten();
+ checkNothingWritten(partLoc);
Assert.assertEquals(TransactionBatch.TxnState.ABORTED
, txnBatch.getCurrentTransactionState());
@@ -550,7 +750,7 @@ public class TestStreaming {
txnBatch.close();
connection.close();
- checkNothingWritten();
+ checkNothingWritten(partLoc);
}
@@ -569,7 +769,7 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.abort();
- checkNothingWritten();
+ checkNothingWritten(partLoc);
Assert.assertEquals(TransactionBatch.TxnState.ABORTED
, txnBatch.getCurrentTransactionState());
@@ -579,8 +779,8 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
txnBatch.close();
connection.close();
@@ -598,14 +798,14 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
txnBatch.beginNextTransaction();
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
txnBatch.close();
@@ -615,16 +815,16 @@ public class TestStreaming {
txnBatch.write("3,Hello streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
txnBatch.beginNextTransaction();
txnBatch.write("4,Welcome to streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
- "{4, Welcome to streaming - once again}");
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
+ "{4, Welcome to streaming - once again}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -655,15 +855,15 @@ public class TestStreaming {
txnBatch1.write("1,Hello streaming".getBytes());
txnBatch2.write("3,Hello streaming - once again".getBytes());
- checkNothingWritten();
+ checkNothingWritten(partLoc);
txnBatch2.commit();
- checkDataWritten(11, 20, 1, 1, "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.beginNextTransaction();
txnBatch1.write("2,Welcome to streaming".getBytes());
@@ -671,17 +871,17 @@ public class TestStreaming {
txnBatch2.beginNextTransaction();
txnBatch2.write("4,Welcome to streaming - once again".getBytes());
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}");
txnBatch2.commit();
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}",
"{4, Welcome to streaming - once again}");
@@ -772,6 +972,164 @@ public class TestStreaming {
}
}
+
+ private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
+ org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.getLocal(new Configuration());
+ Reader reader = OrcFile.createReader(orcFile,
+ OrcFile.readerOptions(conf).filesystem(fs));
+
+ RecordReader rows = reader.rows(null);
+ StructObjectInspector inspector = (StructObjectInspector) reader
+ .getObjectInspector();
+
+ System.out.format("Found Bucket File : %s \n", orcFile.getName());
+ ArrayList<SampleRec> result = new ArrayList<SampleRec>();
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ SampleRec rec = (SampleRec) deserializeDeltaFileRow(row, inspector)[5];
+ result.add(rec);
+ }
+
+ return result;
+ }
+
+ // Assumes stored data schema = [acid fields],string,int,string
+ // return array of 6 fields, where the last field has the actual data
+ private static Object[] deserializeDeltaFileRow(Object row, StructObjectInspector inspector) {
+ List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+
+ WritableIntObjectInspector f0ins = (WritableIntObjectInspector) fields.get(0).getFieldObjectInspector();
+ WritableLongObjectInspector f1ins = (WritableLongObjectInspector) fields.get(1).getFieldObjectInspector();
+ WritableIntObjectInspector f2ins = (WritableIntObjectInspector) fields.get(2).getFieldObjectInspector();
+ WritableLongObjectInspector f3ins = (WritableLongObjectInspector) fields.get(3).getFieldObjectInspector();
+ WritableLongObjectInspector f4ins = (WritableLongObjectInspector) fields.get(4).getFieldObjectInspector();
+ StructObjectInspector f5ins = (StructObjectInspector) fields.get(5).getFieldObjectInspector();
+
+ int f0 = f0ins.get(inspector.getStructFieldData(row, fields.get(0)));
+ long f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+ int f2 = f2ins.get(inspector.getStructFieldData(row, fields.get(2)));
+ long f3 = f3ins.get(inspector.getStructFieldData(row, fields.get(3)));
+ long f4 = f4ins.get(inspector.getStructFieldData(row, fields.get(4)));
+ SampleRec f5 = deserializeInner(inspector.getStructFieldData(row, fields.get(5)), f5ins);
+
+ return new Object[] {f0, f1, f2, f3, f4, f5};
+ }
+
+ // Assumes row schema => string,int,string
+ private static SampleRec deserializeInner(Object row, StructObjectInspector inspector) {
+ List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+
+ WritableStringObjectInspector f0ins = (WritableStringObjectInspector) fields.get(0).getFieldObjectInspector();
+ WritableIntObjectInspector f1ins = (WritableIntObjectInspector) fields.get(1).getFieldObjectInspector();
+ WritableStringObjectInspector f2ins = (WritableStringObjectInspector) fields.get(2).getFieldObjectInspector();
+
+ String f0 = f0ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(0)));
+ int f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+ String f2 = f2ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(2)));
+ return new SampleRec(f0, f1, f2);
+ }
+
+ @Test
+ public void testBucketing() throws Exception {
+ dropDB(msClient, dbName3);
+ dropDB(msClient, dbName4);
+
+ // 1) Create two bucketed tables
+ String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+ String[] colNames = "key1,key2,data".split(",");
+ String[] colTypes = "string,int,string".split(",");
+ String[] bucketNames = "key1,key2".split(",");
+ int bucketCount = 4;
+ createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+ , null, dbLocation, bucketCount);
+
+ String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
+ dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths
+ String[] colNames2 = "key3,key4,data2".split(",");
+ String[] colTypes2 = "string,int,string".split(",");
+ String[] bucketNames2 = "key3,key4".split(",");
+ createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2
+ , null, dbLocation2, bucketCount);
+
+
+ // 2) Insert data into both tables
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name0,1,Hello streaming".getBytes());
+ txnBatch.write("name2,2,Welcome to streaming".getBytes());
+ txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+ txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+ txnBatch.commit();
+
+
+ HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null);
+ DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2);
+ StreamingConnection connection2 = endPt2.newConnection(false);
+ TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2);
+ txnBatch2.beginNextTransaction();
+
+ txnBatch2.write("name5,2,fact3".getBytes()); // bucket 0
+ txnBatch2.write("name8,2,fact3".getBytes()); // bucket 1
+ txnBatch2.write("name0,1,fact1".getBytes()); // bucket 2
+ // no data for bucket 3 -- expect 0 length bucket file
+
+
+ txnBatch2.commit();
+
+ // 3 Check data distribution in buckets
+
+ HashMap<Integer, ArrayList<SampleRec>> actual1 = dumpAllBuckets(dbLocation, tblName3);
+ HashMap<Integer, ArrayList<SampleRec>> actual2 = dumpAllBuckets(dbLocation2, tblName4);
+ System.err.println("\n Table 1");
+ System.err.println(actual1);
+ System.err.println("\n Table 2");
+ System.err.println(actual2);
+
+ // assert bucket listing is as expected
+ Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 4);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(0).size(), 2);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(1).size(), 1);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(2).size(), 0);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(3).size(), 1);
+
+
+ }
+
+
+ // assumes un partitioned table
+ // returns a map<bucketNum, list<record> >
+ private HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets(String dbLocation, String tableName)
+ throws IOException {
+ HashMap<Integer, ArrayList<SampleRec>> result = new HashMap<Integer, ArrayList<SampleRec>>();
+
+ for (File deltaDir : new File(dbLocation + "/" + tableName).listFiles()) {
+ if(!deltaDir.getName().startsWith("delta"))
+ continue;
+ File[] bucketFiles = deltaDir.listFiles();
+ for (File bucketFile : bucketFiles) {
+ if(bucketFile.toString().endsWith("length"))
+ continue;
+ Integer bucketNum = getBucketNumber(bucketFile);
+ ArrayList<SampleRec> recs = dumpBucket(new Path(bucketFile.toString()));
+ result.put(bucketNum, recs);
+ }
+ }
+ return result;
+ }
+
+ //assumes bucket_NNNNN format of file name
+ private Integer getBucketNumber(File bucketFile) {
+ String fname = bucketFile.getName();
+ int start = fname.indexOf('_');
+ String number = fname.substring(start+1, fname.length());
+ return Integer.parseInt(number);
+ }
+
// delete db and all tables in it
public static void dropDB(IMetaStoreClient client, String databaseName) {
try {
@@ -784,90 +1142,182 @@ public class TestStreaming {
}
- public void createDbAndTable(IMetaStoreClient client, String databaseName,
- String tableName, List<String> partVals)
+
+
+ ///////// -------- UTILS ------- /////////
+ // returns Path of the partition created (if any) else Path of table
+ public static Path createDbAndTable(Driver driver, String databaseName,
+ String tableName, List<String> partVals,
+ String[] colNames, String[] colTypes,
+ String[] bucketCols,
+ String[] partNames, String dbLocation, int bucketCount)
throws Exception {
- Database db = new Database();
- db.setName(databaseName);
- String dbLocation = "raw://" + dbFolder.newFolder(databaseName + ".db").toURI().getPath();
- db.setLocationUri(dbLocation);
- client.createDatabase(db);
-
- Table tbl = new Table();
- tbl.setDbName(databaseName);
- tbl.setTableName(tableName);
- tbl.setTableType(TableType.MANAGED_TABLE.toString());
- StorageDescriptor sd = new StorageDescriptor();
- sd.setCols(getTableColumns());
- sd.setNumBuckets(1);
- sd.setLocation(dbLocation + Path.SEPARATOR + tableName);
- tbl.setPartitionKeys(getPartitionKeys());
-
- tbl.setSd(sd);
-
- sd.setBucketCols(new ArrayList<String>(2));
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setName(tbl.getTableName());
- sd.getSerdeInfo().setParameters(new HashMap<String, String>());
- sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
-
- sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
- sd.setInputFormat(HiveInputFormat.class.getName());
- sd.setOutputFormat(OrcOutputFormat.class.getName());
-
- Map<String, String> tableParams = new HashMap<String, String>();
- tbl.setParameters(tableParams);
- client.createTable(tbl);
- try {
- addPartition(client, tbl, partVals);
- } catch (AlreadyExistsException e) {
- }
- Partition createdPartition = client.getPartition(databaseName, tableName, partVals);
- partLocation = createdPartition.getSd().getLocation();
- }
-
- private static void addPartition(IMetaStoreClient client, Table tbl
- , List<String> partValues)
- throws IOException, TException {
- Partition part = new Partition();
- part.setDbName(tbl.getDbName());
- part.setTableName(tblName);
- StorageDescriptor sd = new StorageDescriptor(tbl.getSd());
- sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys()
- , partValues));
- part.setSd(sd);
- part.setValues(partValues);
- client.add_partition(part);
- }
-
- private static String makePartPath(List<FieldSchema> partKeys, List<String> partVals) {
- if (partKeys.size()!=partVals.size()) {
- throw new IllegalArgumentException("Partition values:" + partVals
- + ", does not match the partition Keys in table :" + partKeys );
- }
- StringBuilder buff = new StringBuilder(partKeys.size()*20);
- buff.append(" ( ");
- int i=0;
- for (FieldSchema schema : partKeys) {
- buff.append(schema.getName());
- buff.append("='");
- buff.append(partVals.get(i));
- buff.append("'");
- if (i!=partKeys.size()-1) {
- buff.append(Path.SEPARATOR);
+ String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
+ String tableLoc = dbUri + Path.SEPARATOR + tableName;
+
+ runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'");
+ runDDL(driver, "use " + databaseName);
+ String crtTbl = "create table " + tableName +
+ " ( " + getTableColumnsStr(colNames,colTypes) + " )" +
+ getPartitionStmtStr(partNames) +
+ " clustered by ( " + join(bucketCols, ",") + " )" +
+ " into " + bucketCount + " buckets " +
+ " stored as orc " +
+ " location '" + tableLoc + "'";
+ runDDL(driver, crtTbl);
+ if(partNames!=null && partNames.length!=0) {
+ return addPartition(driver, tableName, partVals, partNames);
+ }
+ return new Path(tableLoc);
+ }
+
+ private static Path addPartition(Driver driver, String tableName, List<String> partVals, String[] partNames) throws QueryFailedException, CommandNeedRetryException, IOException {
+ String partSpec = getPartsSpec(partNames, partVals);
+ String addPart = "alter table " + tableName + " add partition ( " + partSpec + " )";
+ runDDL(driver, addPart);
+ return getPartitionPath(driver, tableName, partSpec);
+ }
+
+ private static Path getPartitionPath(Driver driver, String tableName, String partSpec) throws CommandNeedRetryException, IOException {
+ ArrayList<String> res = queryTable(driver, "describe extended " + tableName + " PARTITION (" + partSpec + ")");
+ String partInfo = res.get(res.size() - 1);
+ int start = partInfo.indexOf("location:") + "location:".length();
+ int end = partInfo.indexOf(",",start);
+ return new Path( partInfo.substring(start,end) );
+ }
+
+ private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
+ StringBuffer sb = new StringBuffer();
+ for (int i=0; i < colNames.length; ++i) {
+ sb.append(colNames[i] + " " + colTypes[i]);
+ if (i<colNames.length-1) {
+ sb.append(",");
}
- ++i;
}
- buff.append(" )");
- return buff.toString();
+ return sb.toString();
}
+ // converts partNames into "partName1 string, partName2 string"
+ private static String getTablePartsStr(String[] partNames) {
+ if (partNames==null || partNames.length==0) {
+ return "";
+ }
+ StringBuffer sb = new StringBuffer();
+ for (int i=0; i < partNames.length; ++i) {
+ sb.append(partNames[i] + " string");
+ if (i < partNames.length-1) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
- private static List<FieldSchema> getTableColumns() {
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- fields.add(new FieldSchema(COL1, serdeConstants.INT_TYPE_NAME, ""));
- fields.add(new FieldSchema(COL2, serdeConstants.STRING_TYPE_NAME, ""));
- return fields;
+ // converts partNames,partVals into "partName1=val1, partName2=val2"
+ private static String getPartsSpec(String[] partNames, List<String> partVals) {
+ StringBuffer sb = new StringBuffer();
+ for (int i=0; i < partVals.size(); ++i) {
+ sb.append(partNames[i] + " = '" + partVals.get(i) + "'");
+ if(i < partVals.size()-1) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
+
+ private static String join(String[] values, String delimiter) {
+ if(values==null)
+ return null;
+ StringBuffer strbuf = new StringBuffer();
+
+ boolean first = true;
+
+ for (Object value : values) {
+ if (!first) { strbuf.append(delimiter); } else { first = false; }
+ strbuf.append(value.toString());
+ }
+
+ return strbuf.toString();
+ }
+ private static String getPartitionStmtStr(String[] partNames) {
+ if ( partNames == null || partNames.length == 0) {
+ return "";
+ }
+ return " partitioned by (" + getTablePartsStr(partNames) + " )";
+ }
+
+ private static boolean runDDL(Driver driver, String sql) throws QueryFailedException {
+ LOG.debug(sql);
+ System.out.println(sql);
+ int retryCount = 1; // # of times to retry if first attempt fails
+ for (int attempt=0; attempt <= retryCount; ++attempt) {
+ try {
+ //LOG.debug("Running Hive Query: "+ sql);
+ CommandProcessorResponse cpr = driver.run(sql);
+ if(cpr.getResponseCode() == 0) {
+ return true;
+ }
+ LOG.error("Statement: " + sql + " failed: " + cpr);
+ } catch (CommandNeedRetryException e) {
+ if (attempt == retryCount) {
+ throw new QueryFailedException(sql, e);
+ }
+ continue;
+ }
+ } // for
+ return false;
+ }
+
+
+ public static ArrayList<String> queryTable(Driver driver, String query)
+ throws CommandNeedRetryException, IOException {
+ driver.run(query);
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ if(res.isEmpty())
+ System.err.println(driver.getErrorMsg());
+ return res;
+ }
+
+ private static class SampleRec {
+ public String field1;
+ public int field2;
+ public String field3;
+
+ public SampleRec(String field1, int field2, String field3) {
+ this.field1 = field1;
+ this.field2 = field2;
+ this.field3 = field3;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ SampleRec that = (SampleRec) o;
+
+ if (field2 != that.field2) return false;
+ if (field1 != null ? !field1.equals(that.field1) : that.field1 != null) return false;
+ return !(field3 != null ? !field3.equals(that.field3) : that.field3 != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = field1 != null ? field1.hashCode() : 0;
+ result = 31 * result + field2;
+ result = 31 * result + (field3 != null ? field3.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return " { " +
+ "'" + field1 + '\'' +
+ "," + field2 +
+ ",'" + field3 + '\'' +
+ " }";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index 00a6384..54ae48e 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -494,6 +494,35 @@ public final class ObjectInspectorUtils {
}
}
+ /**
+ * Computes the bucket number to which the bucketFields belong to
+ * @param bucketFields the bucketed fields of the row
+ * @param bucketFieldInspectors the ObjectInpsectors for each of the bucketed fields
+ * @param totalBuckets the number of buckets in the table
+ * @return the bucket number
+ */
+ public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) {
+ int hashCode = getBucketHashCode(bucketFields, bucketFieldInspectors);
+ int bucketID = (hashCode & Integer.MAX_VALUE) % totalBuckets;
+ return bucketID;
+ }
+
+ /**
+ * Computes the hash code for the given bucketed fields
+ * @param bucketFields
+ * @param bucketFieldInspectors
+ * @return
+ */
+ private static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
+ int hashCode = 0;
+ for (int i = 0; i < bucketFields.length; i++) {
+ int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]);
+ hashCode = 31 * hashCode + fieldHash;
+ }
+ return hashCode;
+ }
+
+
public static int hashCode(Object o, ObjectInspector objIns) {
if (o == null) {
return 0;