You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/05 21:41:40 UTC

[21/23] hive git commit: HIVE-11983 - Hive streaming API uses incorrect logic to assign buckets to incoming records (Roshan Naik via Eugene Koifman)

HIVE-11983 - Hive streaming API uses incorrect logic to assign buckets to incoming records (Roshan Naik via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ca9ff86
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ca9ff86
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ca9ff86

Branch: refs/heads/llap
Commit: 0ca9ff86514f5e76cb0cd99022d8d5d1cf39e626
Parents: 8964c1e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Oct 5 11:06:58 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Oct 5 11:06:58 2015 -0700

----------------------------------------------------------------------
 hcatalog/streaming/pom.xml                      |   7 +
 .../streaming/AbstractRecordWriter.java         |  93 ++-
 .../streaming/DelimitedInputWriter.java         |  54 +-
 .../hcatalog/streaming/StrictJsonWriter.java    |  46 +-
 .../hive/hcatalog/streaming/TestStreaming.java  | 698 +++++++++++++++----
 .../objectinspector/ObjectInspectorUtils.java   |  29 +
 6 files changed, 773 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/hcatalog/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/pom.xml b/hcatalog/streaming/pom.xml
index 6d03ce1..ba9f731 100644
--- a/hcatalog/streaming/pom.xml
+++ b/hcatalog/streaming/pom.xml
@@ -104,6 +104,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>test</scope>
+      <version>${hadoop-23.version}</version>
+    </dependency>
+
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index c959222..a2cd2f5 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -32,13 +33,20 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hive.hcatalog.common.HCatUtil;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
 
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
 
 abstract class AbstractRecordWriter implements RecordWriter {
   static final private Log LOG = LogFactory.getLog(AbstractRecordWriter.class.getName());
@@ -48,14 +56,15 @@ abstract class AbstractRecordWriter implements RecordWriter {
   final Table tbl;
 
   final IMetaStoreClient msClient;
-  RecordUpdater updater = null;
+  protected final List<Integer> bucketIds;
+  ArrayList<RecordUpdater> updaters = null;
+
+  public final int totalBuckets;
 
-  private final int totalBuckets;
-  private Random rand = new Random();
-  private int currentBucketId = 0;
   private final Path partitionPath;
 
   final AcidOutputFormat<?,?> outf;
+  private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
 
   protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
           throws ConnectionError, StreamingException {
@@ -71,8 +80,11 @@ abstract class AbstractRecordWriter implements RecordWriter {
         throw new StreamingException("Cannot stream to table that has not been bucketed : "
                 + endPoint);
       }
+      this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()) ;
+      this.bucketFieldData = new Object[bucketIds.size()];
       String outFormatName = this.tbl.getSd().getOutputFormat();
       outf = (AcidOutputFormat<?,?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
+      bucketFieldData = new Object[bucketIds.size()];
     } catch (MetaException e) {
       throw new ConnectionError(endPoint, e);
     } catch (NoSuchObjectException e) {
@@ -86,17 +98,37 @@ abstract class AbstractRecordWriter implements RecordWriter {
     }
   }
 
-  protected AbstractRecordWriter(HiveEndPoint endPoint)
-          throws ConnectionError, StreamingException {
-    this(endPoint, HiveEndPoint.createHiveConf(AbstractRecordWriter.class, endPoint.metaStoreUri) );
+  // return the column numbers of the bucketed columns
+  private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
+    ArrayList<Integer> result =  new ArrayList<Integer>(bucketCols.size());
+    HashSet<String> bucketSet = new HashSet<String>(bucketCols);
+    for (int i = 0; i < cols.size(); i++) {
+      if( bucketSet.contains(cols.get(i).getName()) ) {
+        result.add(i);
+      }
+    }
+    return result;
   }
 
   abstract SerDe getSerde() throws SerializationError;
 
+  protected abstract ObjectInspector[] getBucketObjectInspectors();
+  protected abstract StructObjectInspector getRecordObjectInspector();
+  protected abstract StructField[] getBucketStructFields();
+
+  // returns the bucket number to which the record belongs to
+  protected int getBucket(Object row) throws SerializationError {
+    ObjectInspector[] inspectors = getBucketObjectInspectors();
+    Object[] bucketFields = getBucketFields(row);
+    return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
+  }
+
   @Override
   public void flush() throws StreamingIOFailure {
     try {
-      updater.flush();
+      for (RecordUpdater updater : updaters) {
+        updater.flush();
+      }
     } catch (IOException e) {
       throw new StreamingIOFailure("Unable to flush recordUpdater", e);
     }
@@ -116,9 +148,8 @@ abstract class AbstractRecordWriter implements RecordWriter {
   public void newBatch(Long minTxnId, Long maxTxnID)
           throws StreamingIOFailure, SerializationError {
     try {
-      this.currentBucketId = rand.nextInt(totalBuckets);
       LOG.debug("Creating Record updater");
-      updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID);
+      updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID);
     } catch (IOException e) {
       LOG.error("Failed creating record updater", e);
       throw new StreamingIOFailure("Unable to get new record Updater", e);
@@ -128,13 +159,49 @@ abstract class AbstractRecordWriter implements RecordWriter {
   @Override
   public void closeBatch() throws StreamingIOFailure {
     try {
-      updater.close(false);
-      updater = null;
+      for (RecordUpdater updater : updaters) {
+        updater.close(false);
+      }
+      updaters.clear();
     } catch (IOException e) {
       throw new StreamingIOFailure("Unable to close recordUpdater", e);
     }
   }
 
+  protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds
+          , StructObjectInspector recordObjInspector)
+          throws SerializationError {
+    ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
+
+    for (int i = 0; i < bucketIds.size(); i++) {
+      int bucketId = bucketIds.get(i);
+      result[i] =
+              recordObjInspector.getAllStructFieldRefs().get( bucketId ).getFieldObjectInspector();
+    }
+    return result;
+  }
+
+
+  private Object[] getBucketFields(Object row) throws SerializationError {
+    StructObjectInspector recordObjInspector = getRecordObjectInspector();
+    StructField[] bucketStructFields = getBucketStructFields();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketFieldData[i] = recordObjInspector.getStructFieldData(row,  bucketStructFields[i]);
+    }
+    return bucketFieldData;
+  }
+
+
+
+  private ArrayList<RecordUpdater> createRecordUpdaters(int bucketCount, Long minTxnId, Long maxTxnID)
+          throws IOException, SerializationError {
+    ArrayList<RecordUpdater> result = new ArrayList<RecordUpdater>(bucketCount);
+    for (int bucket = 0; bucket < bucketCount; bucket++) {
+      result.add(createRecordUpdater(bucket, minTxnId, maxTxnID) );
+    }
+    return result;
+  }
+
   private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
           throws IOException, SerializationError {
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
index 6dc69f0..fd36a38 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
@@ -26,12 +26,14 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.io.BytesWritable;
 
 import java.io.IOException;
@@ -51,7 +53,11 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
   private char serdeSeparator;
   private int[] fieldToColMapping;
   private final ArrayList<String> tableColumns;
-  private AbstractSerDe serde = null;
+  private LazySimpleSerDe serde = null;
+
+  private final LazySimpleStructObjectInspector recordObjInspector;
+  private final ObjectInspector[] bucketObjInspectors;
+  private final StructField[] bucketStructFields;
 
   static final private Log LOG = LogFactory.getLog(DelimitedInputWriter.class.getName());
 
@@ -120,6 +126,22 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
     this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
     LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint);
     this.serdeSeparator = serdeSeparator;
+    this.serde = createSerde(tbl, conf, serdeSeparator);
+
+    // get ObjInspectors for entire record and bucketed cols
+    try {
+      this.recordObjInspector = (LazySimpleStructObjectInspector) serde.getObjectInspector();
+      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+    }
+
+    // get StructFields for bucketed cols
+    bucketStructFields = new StructField[bucketIds.size()];
+    List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    }
   }
 
   private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) {
@@ -173,14 +195,14 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
     }
     String[] reorderedFields = new String[getTableColumns().size()];
     String decoded = new String(record);
-    String[] fields = decoded.split(delimiter);
+    String[] fields = decoded.split(delimiter,-1);
     for (int i=0; i<fieldToColMapping.length; ++i) {
       int newIndex = fieldToColMapping[i];
       if(newIndex != -1) {
         reorderedFields[newIndex] = fields[i];
       }
     }
-    return join(reorderedFields,getSerdeSeparator());
+    return join(reorderedFields, getSerdeSeparator());
   }
 
   // handles nulls in items[]
@@ -212,7 +234,8 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
     try {
       byte[] orderedFields = reorderFields(record);
       Object encodedRow = encode(orderedFields);
-      updater.insert(transactionId, encodedRow);
+      int bucket = getBucket(encodedRow);
+      updaters.get(bucket).insert(transactionId, encodedRow);
     } catch (IOException e) {
       throw new StreamingIOFailure("Error writing record in transaction ("
               + transactionId + ")", e);
@@ -221,13 +244,22 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
 
   @Override
   SerDe getSerde() throws SerializationError {
-    if(serde!=null) {
-      return serde;
-    }
-    serde = createSerde(tbl, conf);
     return serde;
   }
 
+  protected LazySimpleStructObjectInspector getRecordObjectInspector() {
+    return recordObjInspector;
+  }
+
+  @Override
+  protected StructField[] getBucketStructFields() {
+    return bucketStructFields;
+  }
+
+  protected ObjectInspector[] getBucketObjectInspectors() {
+    return bucketObjInspectors;
+  }
+
   private Object encode(byte[] record) throws SerializationError {
     try {
       BytesWritable blob = new BytesWritable();
@@ -244,7 +276,7 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
    * @throws SerializationError if serde could not be initialized
    * @param tbl
    */
-  protected LazySimpleSerDe createSerde(Table tbl, HiveConf conf)
+  protected static LazySimpleSerDe createSerde(Table tbl, HiveConf conf, char serdeSeparator)
           throws SerializationError {
     try {
       Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);

http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
index 6d6beb8..6ab21eb 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
@@ -24,10 +24,14 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.io.Text;
+import org.apache.hive.hcatalog.data.HCatRecordObjectInspector;
 import org.apache.hive.hcatalog.data.JsonSerDe;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Properties;
 
 /**
@@ -37,6 +41,10 @@ import java.util.Properties;
 public class StrictJsonWriter extends AbstractRecordWriter {
   private JsonSerDe serde;
 
+  private final HCatRecordObjectInspector recordObjInspector;
+  private final ObjectInspector[] bucketObjInspectors;
+  private final StructField[] bucketStructFields;
+
   /**
    *
    * @param endPoint the end point to write to
@@ -46,7 +54,7 @@ public class StrictJsonWriter extends AbstractRecordWriter {
    */
   public StrictJsonWriter(HiveEndPoint endPoint)
           throws ConnectionError, SerializationError, StreamingException {
-    super(endPoint, null);
+    this(endPoint, null);
   }
 
   /**
@@ -60,23 +68,49 @@ public class StrictJsonWriter extends AbstractRecordWriter {
   public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf)
           throws ConnectionError, SerializationError, StreamingException {
     super(endPoint, conf);
+    this.serde = createSerde(tbl, conf);
+    // get ObjInspectors for entire record and bucketed cols
+    try {
+      recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector();
+      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+    }
+
+    // get StructFields for bucketed cols
+    bucketStructFields = new StructField[bucketIds.size()];
+    List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    }
   }
 
   @Override
   SerDe getSerde() throws SerializationError {
-    if(serde!=null) {
-      return serde;
-    }
-    serde = createSerde(tbl, conf);
     return serde;
   }
 
+  protected HCatRecordObjectInspector getRecordObjectInspector() {
+    return recordObjInspector;
+  }
+
+  @Override
+  protected StructField[] getBucketStructFields() {
+    return bucketStructFields;
+  }
+
+  protected ObjectInspector[] getBucketObjectInspectors() {
+    return bucketObjInspectors;
+  }
+
+
   @Override
   public void write(long transactionId, byte[] record)
           throws StreamingIOFailure, SerializationError {
     try {
       Object encodedRow = encode(record);
-      updater.insert(transactionId, encodedRow);
+      int bucket = getBucket(encodedRow);
+      updaters.get(bucket).insert(transactionId, encodedRow);
     } catch (IOException e) {
       throw new StreamingIOFailure("Error writing record in transaction("
               + transactionId + ")", e);

http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index c28d4aa..2f6baec 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -18,33 +18,38 @@
 
 package org.apache.hive.hcatalog.streaming;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -68,7 +73,6 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 
@@ -118,6 +122,7 @@ public class TestStreaming {
   private static final String COL2 = "msg";
 
   private final HiveConf conf;
+  private Driver driver;
   private final IMetaStoreClient msClient;
 
   final String metaStoreURI = null;
@@ -127,13 +132,23 @@ public class TestStreaming {
   private final static String tblName = "alerts";
   private final static String[] fieldNames = new String[]{COL1,COL2};
   List<String> partitionVals;
-  private static String partLocation;
+  private static Path partLoc;
+  private static Path partLoc2;
 
   // unpartitioned table
-  private final static String dbName2 = "testing";
+  private final static String dbName2 = "testing2";
   private final static String tblName2 = "alerts";
   private final static String[] fieldNames2 = new String[]{COL1,COL2};
 
+
+  // for bucket join testing
+  private final static String dbName3 = "testing3";
+  private final static String tblName3 = "dimensionTable";
+  private final static String dbName4 = "testing4";
+  private final static String tblName4 = "factTable";
+  List<String> partitionVals2;
+
+
   private final String PART1_CONTINENT = "Asia";
   private final String PART1_COUNTRY = "India";
 
@@ -146,14 +161,21 @@ public class TestStreaming {
     partitionVals.add(PART1_CONTINENT);
     partitionVals.add(PART1_COUNTRY);
 
+    partitionVals2 = new ArrayList<String>(1);
+    partitionVals2.add(PART1_COUNTRY);
+
+
     conf = new HiveConf(this.getClass());
     conf.set("fs.raw.impl", RawFileSystem.class.getName());
+    conf.set("hive.enforce.bucketing", "true");
     TxnDbUtil.setConfValues(conf);
     if (metaStoreURI!=null) {
       conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
     }
     conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
     conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    dbFolder.create();
+
 
     //1) Start from a clean slate (metastore)
     TxnDbUtil.cleanDb();
@@ -165,17 +187,35 @@ public class TestStreaming {
 
   @Before
   public void setup() throws Exception {
+    SessionState.start(new CliSessionState(conf));
+    driver = new Driver(conf);
+    driver.setMaxRows(200002);//make sure Driver returns all results
     // drop and recreate the necessary databases and tables
     dropDB(msClient, dbName);
-    createDbAndTable(msClient, dbName, tblName, partitionVals);
+
+    String[] colNames = new String[] {COL1, COL2};
+    String[] colTypes = new String[] {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
+    String[] bucketCols = new String[] {COL1};
+    String loc1 = dbFolder.newFolder(dbName + ".db").toString();
+    String[] partNames = new String[]{"Continent", "Country"};
+    partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, colNames, colTypes, bucketCols, partNames, loc1, 1);
 
     dropDB(msClient, dbName2);
-    createDbAndTable(msClient, dbName2, tblName2, partitionVals);
+    String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
+    partLoc2 = createDbAndTable(driver, dbName2, tblName2, partitionVals, colNames, colTypes, bucketCols, partNames, loc2, 2);
+
+    String loc3 = dbFolder.newFolder("testing5.db").toString();
+    createStoreSales("testing5", loc3);
+
+    runDDL(driver, "drop table testBucketing3.streamedtable");
+    runDDL(driver, "drop table testBucketing3.finaltable");
+    runDDL(driver, "drop table testBucketing3.nobucket");
   }
 
   @After
   public void cleanup() throws Exception {
     msClient.close();
+    driver.close();
   }
 
   private static List<FieldSchema> getPartitionKeys() {
@@ -186,10 +226,170 @@ public class TestStreaming {
     return fields;
   }
 
-  private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpectedFiles,
+  private void createStoreSales(String dbName, String loc) throws Exception {
+    String dbUri = "raw://" + new Path(loc).toUri().toString();
+    String tableLoc = dbUri + Path.SEPARATOR + "store_sales";
+
+    boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'");
+    Assert.assertTrue(success);
+    success = runDDL(driver, "use " + dbName);
+    Assert.assertTrue(success);
+
+    success = runDDL(driver, "drop table if exists store_sales");
+    Assert.assertTrue(success);
+    success = runDDL(driver, "create table store_sales\n" +
+      "(\n" +
+      "    ss_sold_date_sk           int,\n" +
+      "    ss_sold_time_sk           int,\n" +
+      "    ss_item_sk                int,\n" +
+      "    ss_customer_sk            int,\n" +
+      "    ss_cdemo_sk               int,\n" +
+      "    ss_hdemo_sk               int,\n" +
+      "    ss_addr_sk                int,\n" +
+      "    ss_store_sk               int,\n" +
+      "    ss_promo_sk               int,\n" +
+      "    ss_ticket_number          int,\n" +
+      "    ss_quantity               int,\n" +
+      "    ss_wholesale_cost         decimal(7,2),\n" +
+      "    ss_list_price             decimal(7,2),\n" +
+      "    ss_sales_price            decimal(7,2),\n" +
+      "    ss_ext_discount_amt       decimal(7,2),\n" +
+      "    ss_ext_sales_price        decimal(7,2),\n" +
+      "    ss_ext_wholesale_cost     decimal(7,2),\n" +
+      "    ss_ext_list_price         decimal(7,2),\n" +
+      "    ss_ext_tax                decimal(7,2),\n" +
+      "    ss_coupon_amt             decimal(7,2),\n" +
+      "    ss_net_paid               decimal(7,2),\n" +
+      "    ss_net_paid_inc_tax       decimal(7,2),\n" +
+      "    ss_net_profit             decimal(7,2)\n" +
+      ")\n" +
+      " partitioned by (dt string)\n" +
+      "clustered by (ss_store_sk, ss_promo_sk)\n" +
+      "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc +  "'" + "  TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
+    Assert.assertTrue(success);
+
+    success = runDDL(driver, "alter table store_sales add partition(dt='2015')");
+    Assert.assertTrue(success);
+  }
+  /**
+   * make sure it works with table where bucket col is not 1st col
+   * @throws Exception
+   */
+  @Test
+  public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
+    List<String> partitionVals = new ArrayList<String>();
+    partitionVals.add("2015");
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
+      "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
+      "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
+      "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt);
+    StreamingConnection connection = endPt.newConnection(false, null);//should this really be null?
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+
+    StringBuilder row = new StringBuilder();
+    for(int i = 0; i < 10; i++) {
+      for(int ints = 0; ints < 11; ints++) {
+        row.append(ints).append(',');
+      }
+      for(int decs = 0; decs < 12; decs++) {
+        row.append(i + 0.1).append(',');
+      }
+      row.setLength(row.length() - 1);
+      txnBatch.write(row.toString().getBytes());
+    }
+    txnBatch.commit();
+    txnBatch.close();
+    connection.close();
+
+    ArrayList<String> res = queryTable(driver, "select row__id.bucketid, * from testing5.store_sales");
+    for (String re : res) {
+      System.out.println(re);
+    }
+  }
+
+
+  // stream data into streaming table with N buckets, then copy the data into another bucketed table
+  // check if bucketing in both was done in the same way
+  @Test
+  public void testStreamBucketingMatchesRegularBucketing() throws Exception {
+    int bucketCount = 100;
+
+    String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+    String tableLoc  = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'";
+    String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'";
+    String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'";
+
+    runDDL(driver, "create database testBucketing3");
+    runDDL(driver, "use testBucketing3");
+    runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc + " TBLPROPERTIES ('transactional'='true')") ;
+//  In 'nobucket' table we capture bucketid from streamedtable to workaround a hive bug that prevents joins two identically bucketed tables
+    runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3) ;
+    runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+            + bucketCount + " buckets  stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')");
+
+
+    String[] records = new String[] {
+    "PSFAHYLZVC,29,EPNMA",
+    "PPPRKWAYAU,96,VUTEE",
+    "MIAOFERCHI,3,WBDSI",
+    "CEGQAZOWVN,0,WCUZL",
+    "XWAKMNSVQF,28,YJVHU",
+    "XBWTSAJWME,2,KDQFO",
+    "FUVLQTAXAY,5,LDSDG",
+    "QTQMDJMGJH,6,QBOMA",
+    "EFLOTLWJWN,71,GHWPS",
+    "PEQNAOJHCM,82,CAAFI",
+    "MOEKQLGZCP,41,RUACR",
+    "QZXMCOPTID,37,LFLWE",
+    "EYALVWICRD,13,JEZLC",
+    "VYWLZAYTXX,16,DMVZX",
+    "OSALYSQIXR,47,HNZVE",
+    "JGKVHKCEGQ,25,KSCJB",
+    "WQFMMYDHET,12,DTRWA",
+    "AJOVAYZKZQ,15,YBKFO",
+    "YAQONWCUAU,31,QJNHZ",
+    "DJBXUEUOEB,35,IYCBL"
+    };
+
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "streamedtable", null);
+    String[] colNames1 = new String[] { "key1", "key2", "data" };
+    DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt);
+    StreamingConnection connection = endPt.newConnection(false);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, wr);
+    txnBatch.beginNextTransaction();
+
+    for (String record : records) {
+      txnBatch.write(record.toString().getBytes());
+    }
+
+    txnBatch.commit();
+    txnBatch.close();
+    connection.close();
+
+    ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2");
+    for (String re : res1) {
+      System.out.println(re);
+    }
+
+    driver.run("insert into nobucket select row__id.bucketid,* from streamedtable");
+    runDDL(driver, " insert into finaltable select * from nobucket");
+    ArrayList<String> res2 = queryTable(driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
+    for (String s : res2) {
+      LOG.error(s);
+    }
+    Assert.assertTrue(res2.isEmpty());
+  }
+
+
+  private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {
     ValidTxnList txns = msClient.getValidTxns();
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -197,7 +397,7 @@ public class TestStreaming {
     for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString());
     Assert.assertEquals(numExpectedFiles, current.size());
 
-    // find the absolute mininum transaction
+    // find the absolute minimum transaction
     long min = Long.MAX_VALUE;
     long max = Long.MIN_VALUE;
     for (AcidUtils.ParsedDelta pd : current) {
@@ -209,11 +409,11 @@ public class TestStreaming {
 
     InputFormat inf = new OrcInputFormat();
     JobConf job = new JobConf();
-    job.set("mapred.input.dir", partLocation.toString());
+    job.set("mapred.input.dir", partitionPath.toString());
     job.set("bucket_count", Integer.toString(buckets));
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
-    InputSplit[] splits = inf.getSplits(job, 1);
-    Assert.assertEquals(1, splits.length);
+    InputSplit[] splits = inf.getSplits(job, buckets);
+    Assert.assertEquals(buckets, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
             inf.getRecordReader(splits[0], job, Reporter.NULL);
 
@@ -226,9 +426,9 @@ public class TestStreaming {
     Assert.assertEquals(false, rr.next(key, value));
   }
 
-  private void checkNothingWritten() throws Exception {
+  private void checkNothingWritten(Path partitionPath) throws Exception {
     ValidTxnList txns = msClient.getValidTxns();
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -398,7 +598,7 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch.getCurrentTransactionState());
@@ -410,11 +610,11 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     txnBatch.commit();
 
-    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
         "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -459,7 +659,7 @@ public class TestStreaming {
     txnBatch.write(rec1.getBytes());
     txnBatch.commit();
 
-    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch.getCurrentTransactionState());
@@ -518,7 +718,7 @@ public class TestStreaming {
               , txnBatch.getCurrentTransactionState());
       ++batch;
     }
-    Assert.assertEquals(0,txnBatch.remainingTransactions());
+    Assert.assertEquals(0, txnBatch.remainingTransactions());
     txnBatch.close();
 
     Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
@@ -542,7 +742,7 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.abort();
 
-    checkNothingWritten();
+    checkNothingWritten(partLoc);
 
     Assert.assertEquals(TransactionBatch.TxnState.ABORTED
             , txnBatch.getCurrentTransactionState());
@@ -550,7 +750,7 @@ public class TestStreaming {
     txnBatch.close();
     connection.close();
 
-    checkNothingWritten();
+    checkNothingWritten(partLoc);
 
   }
 
@@ -569,7 +769,7 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.abort();
 
-    checkNothingWritten();
+    checkNothingWritten(partLoc);
 
     Assert.assertEquals(TransactionBatch.TxnState.ABORTED
             , txnBatch.getCurrentTransactionState());
@@ -579,8 +779,8 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
-        "{2, Welcome to streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+            "{2, Welcome to streaming}");
 
     txnBatch.close();
     connection.close();
@@ -598,14 +798,14 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
-        "{2, Welcome to streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+            "{2, Welcome to streaming}");
 
     txnBatch.close();
 
@@ -615,16 +815,16 @@ public class TestStreaming {
     txnBatch.write("3,Hello streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
-        "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+            "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("4,Welcome to streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
-        "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
-        "{4, Welcome to streaming - once again}");
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+            "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
+            "{4, Welcome to streaming - once again}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch.getCurrentTransactionState());
@@ -655,15 +855,15 @@ public class TestStreaming {
     txnBatch1.write("1,Hello streaming".getBytes());
     txnBatch2.write("3,Hello streaming - once again".getBytes());
 
-    checkNothingWritten();
+    checkNothingWritten(partLoc);
 
     txnBatch2.commit();
 
-    checkDataWritten(11, 20, 1, 1, "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}");
 
     txnBatch1.commit();
 
-    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
 
     txnBatch1.beginNextTransaction();
     txnBatch1.write("2,Welcome to streaming".getBytes());
@@ -671,17 +871,17 @@ public class TestStreaming {
     txnBatch2.beginNextTransaction();
     txnBatch2.write("4,Welcome to streaming - once again".getBytes());
 
-    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
 
     txnBatch1.commit();
 
-    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
         "{2, Welcome to streaming}",
         "{3, Hello streaming - once again}");
 
     txnBatch2.commit();
 
-    checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
         "{2, Welcome to streaming}",
         "{3, Hello streaming - once again}",
         "{4, Welcome to streaming - once again}");
@@ -772,6 +972,164 @@ public class TestStreaming {
     }
   }
 
+
+  private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
+    org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.getLocal(new Configuration());
+    Reader reader = OrcFile.createReader(orcFile,
+            OrcFile.readerOptions(conf).filesystem(fs));
+
+    RecordReader rows = reader.rows(null);
+    StructObjectInspector inspector = (StructObjectInspector) reader
+            .getObjectInspector();
+
+    System.out.format("Found Bucket File : %s \n", orcFile.getName());
+    ArrayList<SampleRec> result = new ArrayList<SampleRec>();
+    while (rows.hasNext()) {
+      Object row = rows.next(null);
+      SampleRec rec = (SampleRec) deserializeDeltaFileRow(row, inspector)[5];
+      result.add(rec);
+    }
+
+    return result;
+  }
+
+  // Assumes stored data schema = [acid fields],string,int,string
+  // return array of 6 fields, where the last field has the actual data
+  private static Object[] deserializeDeltaFileRow(Object row, StructObjectInspector inspector) {
+    List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+
+    WritableIntObjectInspector f0ins = (WritableIntObjectInspector) fields.get(0).getFieldObjectInspector();
+    WritableLongObjectInspector f1ins = (WritableLongObjectInspector) fields.get(1).getFieldObjectInspector();
+    WritableIntObjectInspector f2ins = (WritableIntObjectInspector) fields.get(2).getFieldObjectInspector();
+    WritableLongObjectInspector f3ins = (WritableLongObjectInspector) fields.get(3).getFieldObjectInspector();
+    WritableLongObjectInspector f4ins = (WritableLongObjectInspector)  fields.get(4).getFieldObjectInspector();
+    StructObjectInspector f5ins = (StructObjectInspector) fields.get(5).getFieldObjectInspector();
+
+    int f0 = f0ins.get(inspector.getStructFieldData(row, fields.get(0)));
+    long f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+    int f2 = f2ins.get(inspector.getStructFieldData(row, fields.get(2)));
+    long f3 = f3ins.get(inspector.getStructFieldData(row, fields.get(3)));
+    long f4 = f4ins.get(inspector.getStructFieldData(row, fields.get(4)));
+    SampleRec f5 = deserializeInner(inspector.getStructFieldData(row, fields.get(5)), f5ins);
+
+    return new Object[] {f0, f1, f2, f3, f4, f5};
+  }
+
+  // Assumes row schema => string,int,string
+  private static SampleRec deserializeInner(Object row, StructObjectInspector inspector) {
+    List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+
+    WritableStringObjectInspector f0ins = (WritableStringObjectInspector) fields.get(0).getFieldObjectInspector();
+    WritableIntObjectInspector f1ins = (WritableIntObjectInspector) fields.get(1).getFieldObjectInspector();
+    WritableStringObjectInspector f2ins = (WritableStringObjectInspector) fields.get(2).getFieldObjectInspector();
+
+    String f0 = f0ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(0)));
+    int f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+    String f2 = f2ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(2)));
+    return new SampleRec(f0, f1, f2);
+  }
+
+  @Test
+  public void testBucketing() throws Exception {
+    dropDB(msClient, dbName3);
+    dropDB(msClient, dbName4);
+
+    // 1) Create two bucketed tables
+    String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames = "key1,key2,data".split(",");
+    String[] colTypes = "string,int,string".split(",");
+    String[] bucketNames = "key1,key2".split(",");
+    int bucketCount = 4;
+    createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+            , null, dbLocation, bucketCount);
+
+    String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
+    dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames2 = "key3,key4,data2".split(",");
+    String[] colTypes2 = "string,int,string".split(",");
+    String[] bucketNames2 = "key3,key4".split(",");
+    createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2
+            , null, dbLocation2, bucketCount);
+
+
+    // 2) Insert data into both tables
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
+    StreamingConnection connection = endPt.newConnection(false);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name0,1,Hello streaming".getBytes());
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.commit();
+
+
+    HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null);
+    DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2);
+    StreamingConnection connection2 = endPt2.newConnection(false);
+    TransactionBatch txnBatch2 =  connection2.fetchTransactionBatch(2, writer2);
+    txnBatch2.beginNextTransaction();
+
+    txnBatch2.write("name5,2,fact3".getBytes());  // bucket 0
+    txnBatch2.write("name8,2,fact3".getBytes());  // bucket 1
+    txnBatch2.write("name0,1,fact1".getBytes());  // bucket 2
+    // no data for bucket 3 -- expect 0 length bucket file
+
+
+    txnBatch2.commit();
+
+    // 3 Check data distribution in  buckets
+
+    HashMap<Integer, ArrayList<SampleRec>> actual1 = dumpAllBuckets(dbLocation, tblName3);
+    HashMap<Integer, ArrayList<SampleRec>> actual2 = dumpAllBuckets(dbLocation2, tblName4);
+    System.err.println("\n  Table 1");
+    System.err.println(actual1);
+    System.err.println("\n  Table 2");
+    System.err.println(actual2);
+
+    // assert bucket listing is as expected
+    Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 4);
+    Assert.assertEquals("records in bucket does not match expectation", actual1.get(0).size(), 2);
+    Assert.assertEquals("records in bucket does not match expectation", actual1.get(1).size(), 1);
+    Assert.assertEquals("records in bucket does not match expectation", actual1.get(2).size(), 0);
+    Assert.assertEquals("records in bucket does not match expectation", actual1.get(3).size(), 1);
+
+
+  }
+
+
+    // assumes un partitioned table
+  // returns a map<bucketNum, list<record> >
+  private HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets(String dbLocation, String tableName)
+          throws IOException {
+    HashMap<Integer, ArrayList<SampleRec>> result = new HashMap<Integer, ArrayList<SampleRec>>();
+
+    for (File deltaDir : new File(dbLocation + "/" + tableName).listFiles()) {
+      if(!deltaDir.getName().startsWith("delta"))
+        continue;
+      File[] bucketFiles = deltaDir.listFiles();
+      for (File bucketFile : bucketFiles) {
+        if(bucketFile.toString().endsWith("length"))
+          continue;
+        Integer bucketNum = getBucketNumber(bucketFile);
+        ArrayList<SampleRec>  recs = dumpBucket(new Path(bucketFile.toString()));
+        result.put(bucketNum, recs);
+      }
+    }
+    return result;
+  }
+
+  //assumes bucket_NNNNN format of file name
+  private Integer getBucketNumber(File bucketFile) {
+    String fname = bucketFile.getName();
+    int start = fname.indexOf('_');
+    String number = fname.substring(start+1, fname.length());
+    return Integer.parseInt(number);
+  }
+
   // delete db and all tables in it
   public static void dropDB(IMetaStoreClient client, String databaseName) {
     try {
@@ -784,90 +1142,182 @@ public class TestStreaming {
 
   }
 
-  public void createDbAndTable(IMetaStoreClient client, String databaseName,
-                               String tableName, List<String> partVals)
+
+
+  ///////// -------- UTILS ------- /////////
+  // returns Path of the partition created (if any) else Path of table
+  public static Path createDbAndTable(Driver driver, String databaseName,
+                                      String tableName, List<String> partVals,
+                                      String[] colNames, String[] colTypes,
+                                      String[] bucketCols,
+                                      String[] partNames, String dbLocation, int bucketCount)
           throws Exception {
-    Database db = new Database();
-    db.setName(databaseName);
-    String dbLocation = "raw://" + dbFolder.newFolder(databaseName + ".db").toURI().getPath();
-    db.setLocationUri(dbLocation);
-    client.createDatabase(db);
-
-    Table tbl = new Table();
-    tbl.setDbName(databaseName);
-    tbl.setTableName(tableName);
-    tbl.setTableType(TableType.MANAGED_TABLE.toString());
-    StorageDescriptor sd = new StorageDescriptor();
-    sd.setCols(getTableColumns());
-    sd.setNumBuckets(1);
-    sd.setLocation(dbLocation + Path.SEPARATOR + tableName);
-    tbl.setPartitionKeys(getPartitionKeys());
-
-    tbl.setSd(sd);
-
-    sd.setBucketCols(new ArrayList<String>(2));
-    sd.setSerdeInfo(new SerDeInfo());
-    sd.getSerdeInfo().setName(tbl.getTableName());
-    sd.getSerdeInfo().setParameters(new HashMap<String, String>());
-    sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
-
-    sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
-    sd.setInputFormat(HiveInputFormat.class.getName());
-    sd.setOutputFormat(OrcOutputFormat.class.getName());
-
-    Map<String, String> tableParams = new HashMap<String, String>();
-    tbl.setParameters(tableParams);
-    client.createTable(tbl);
 
-    try {
-      addPartition(client, tbl, partVals);
-    } catch (AlreadyExistsException e) {
-    }
-    Partition createdPartition = client.getPartition(databaseName, tableName, partVals);
-    partLocation = createdPartition.getSd().getLocation();
-  }
-
-  private static void addPartition(IMetaStoreClient client, Table tbl
-          , List<String> partValues)
-          throws IOException, TException {
-    Partition part = new Partition();
-    part.setDbName(tbl.getDbName());
-    part.setTableName(tblName);
-    StorageDescriptor sd = new StorageDescriptor(tbl.getSd());
-    sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys()
-            , partValues));
-    part.setSd(sd);
-    part.setValues(partValues);
-    client.add_partition(part);
-  }
-
-  private static String makePartPath(List<FieldSchema> partKeys, List<String> partVals) {
-    if (partKeys.size()!=partVals.size()) {
-      throw new IllegalArgumentException("Partition values:" + partVals
-              + ", does not match the partition Keys in table :" + partKeys );
-    }
-    StringBuilder buff = new StringBuilder(partKeys.size()*20);
-    buff.append(" ( ");
-    int i=0;
-    for (FieldSchema schema : partKeys) {
-      buff.append(schema.getName());
-      buff.append("='");
-      buff.append(partVals.get(i));
-      buff.append("'");
-      if (i!=partKeys.size()-1) {
-        buff.append(Path.SEPARATOR);
+    String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
+    String tableLoc = dbUri + Path.SEPARATOR + tableName;
+
+    runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'");
+    runDDL(driver, "use " + databaseName);
+    String crtTbl = "create table " + tableName +
+            " ( " +  getTableColumnsStr(colNames,colTypes) + " )" +
+            getPartitionStmtStr(partNames) +
+            " clustered by ( " + join(bucketCols, ",") + " )" +
+            " into " + bucketCount + " buckets " +
+            " stored as orc " +
+            " location '" + tableLoc +  "'";
+    runDDL(driver, crtTbl);
+    if(partNames!=null && partNames.length!=0) {
+      return addPartition(driver, tableName, partVals, partNames);
+    }
+    return new Path(tableLoc);
+  }
+
+  private static Path addPartition(Driver driver, String tableName, List<String> partVals, String[] partNames) throws QueryFailedException, CommandNeedRetryException, IOException {
+    String partSpec = getPartsSpec(partNames, partVals);
+    String addPart = "alter table " + tableName + " add partition ( " + partSpec  + " )";
+    runDDL(driver, addPart);
+    return getPartitionPath(driver, tableName, partSpec);
+  }
+
+  private static Path getPartitionPath(Driver driver, String tableName, String partSpec) throws CommandNeedRetryException, IOException {
+    ArrayList<String> res = queryTable(driver, "describe extended " + tableName + " PARTITION (" + partSpec + ")");
+    String partInfo = res.get(res.size() - 1);
+    int start = partInfo.indexOf("location:") + "location:".length();
+    int end = partInfo.indexOf(",",start);
+    return new Path( partInfo.substring(start,end) );
+  }
+
+  private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
+    StringBuffer sb = new StringBuffer();
+    for (int i=0; i < colNames.length; ++i) {
+      sb.append(colNames[i] + " " + colTypes[i]);
+      if (i<colNames.length-1) {
+        sb.append(",");
       }
-      ++i;
     }
-    buff.append(" )");
-    return buff.toString();
+    return sb.toString();
   }
 
+  // converts partNames into "partName1 string, partName2 string"
+  private static String getTablePartsStr(String[] partNames) {
+    if (partNames==null || partNames.length==0) {
+      return "";
+    }
+    StringBuffer sb = new StringBuffer();
+    for (int i=0; i < partNames.length; ++i) {
+      sb.append(partNames[i] + " string");
+      if (i < partNames.length-1) {
+        sb.append(",");
+      }
+    }
+    return sb.toString();
+  }
 
-  private static List<FieldSchema> getTableColumns() {
-    List<FieldSchema> fields = new ArrayList<FieldSchema>();
-    fields.add(new FieldSchema(COL1, serdeConstants.INT_TYPE_NAME, ""));
-    fields.add(new FieldSchema(COL2, serdeConstants.STRING_TYPE_NAME, ""));
-    return fields;
+  // converts partNames,partVals into "partName1=val1, partName2=val2"
+  private static String getPartsSpec(String[] partNames, List<String> partVals) {
+    StringBuffer sb = new StringBuffer();
+    for (int i=0; i < partVals.size(); ++i) {
+      sb.append(partNames[i] + " = '" + partVals.get(i) + "'");
+      if(i < partVals.size()-1) {
+        sb.append(",");
+      }
+    }
+    return sb.toString();
+  }
+
+  private static String join(String[] values, String delimiter) {
+    if(values==null)
+      return null;
+    StringBuffer strbuf = new StringBuffer();
+
+    boolean first = true;
+
+    for (Object value : values)  {
+      if (!first) { strbuf.append(delimiter); } else { first = false; }
+      strbuf.append(value.toString());
+    }
+
+    return strbuf.toString();
+  }
+  private static String getPartitionStmtStr(String[] partNames) {
+    if ( partNames == null || partNames.length == 0) {
+      return "";
+    }
+    return " partitioned by (" + getTablePartsStr(partNames) + " )";
+  }
+
+  private static boolean runDDL(Driver driver, String sql) throws QueryFailedException {
+    LOG.debug(sql);
+    System.out.println(sql);
+    int retryCount = 1; // # of times to retry if first attempt fails
+    for (int attempt=0; attempt <= retryCount; ++attempt) {
+      try {
+        //LOG.debug("Running Hive Query: "+ sql);
+        CommandProcessorResponse cpr = driver.run(sql);
+        if(cpr.getResponseCode() == 0) {
+          return true;
+        }
+        LOG.error("Statement: " + sql + " failed: " + cpr);
+      } catch (CommandNeedRetryException e) {
+        if (attempt == retryCount) {
+          throw new QueryFailedException(sql, e);
+        }
+        continue;
+      }
+    } // for
+    return false;
+  }
+
+
+  public static ArrayList<String> queryTable(Driver driver, String query)
+          throws CommandNeedRetryException, IOException {
+    driver.run(query);
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    if(res.isEmpty())
+      System.err.println(driver.getErrorMsg());
+    return res;
+  }
+
+  private static class SampleRec {
+    public String field1;
+    public int field2;
+    public String field3;
+
+    public SampleRec(String field1, int field2, String field3) {
+      this.field1 = field1;
+      this.field2 = field2;
+      this.field3 = field3;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      SampleRec that = (SampleRec) o;
+
+      if (field2 != that.field2) return false;
+      if (field1 != null ? !field1.equals(that.field1) : that.field1 != null) return false;
+      return !(field3 != null ? !field3.equals(that.field3) : that.field3 != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = field1 != null ? field1.hashCode() : 0;
+      result = 31 * result + field2;
+      result = 31 * result + (field3 != null ? field3.hashCode() : 0);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return " { " +
+              "'" + field1 + '\'' +
+              "," + field2 +
+              ",'" + field3 + '\'' +
+              " }";
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ca9ff86/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index 00a6384..54ae48e 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -494,6 +494,35 @@ public final class ObjectInspectorUtils {
     }
   }
 
+  /**
+   * Computes the bucket number to which the bucketFields belong to
+   * @param bucketFields  the bucketed fields of the row
+   * @param bucketFieldInspectors  the ObjectInpsectors for each of the bucketed fields
+   * @param totalBuckets the number of buckets in the table
+   * @return the bucket number
+   */
+  public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) {
+    int hashCode = getBucketHashCode(bucketFields, bucketFieldInspectors);
+    int bucketID = (hashCode & Integer.MAX_VALUE) % totalBuckets;
+    return bucketID;
+  }
+
+  /**
+   * Computes the hash code for the given bucketed fields
+   * @param bucketFields
+   * @param bucketFieldInspectors
+   * @return
+   */
+  private static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
+    int hashCode = 0;
+    for (int i = 0; i < bucketFields.length; i++) {
+      int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]);
+      hashCode = 31 * hashCode + fieldHash;
+    }
+    return hashCode;
+  }
+
+
   public static int hashCode(Object o, ObjectInspector objIns) {
     if (o == null) {
       return 0;