You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/11/30 01:12:20 UTC

[83/91] [abbrv] hive git commit: HIVE-12307 - Streaming API TransactionBatch.close() must abort any remaining transactions in the batch(Eugene Koifman, reviewed by Alan Gates)

HIVE-12307 - Streaming API TransactionBatch.close() must abort any remaining transactions in the batch(Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f1ac5a39
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f1ac5a39
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f1ac5a39

Branch: refs/heads/spark
Commit: f1ac5a391a18fccf724249038fca73e7b55854e2
Parents: 6d4dfa4
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Nov 26 11:48:03 2015 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Nov 26 11:48:29 2015 -0800

----------------------------------------------------------------------
 .../streaming/AbstractRecordWriter.java         |  32 ++-
 .../hcatalog/streaming/ConnectionError.java     |   3 +-
 .../streaming/DelimitedInputWriter.java         |   2 +-
 .../hive/hcatalog/streaming/HiveEndPoint.java   | 211 +++++++++++++------
 .../hcatalog/streaming/StrictJsonWriter.java    |   2 +-
 .../hcatalog/streaming/TransactionBatch.java    |   1 +
 .../hcatalog/streaming/TransactionError.java    |   2 +-
 .../hive/hcatalog/streaming/TestStreaming.java  | 167 +++++++++++++++
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |   3 +
 9 files changed, 344 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index 5c15675..0c6b9ea 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -65,6 +65,8 @@ public abstract class AbstractRecordWriter implements RecordWriter {
 
   final AcidOutputFormat<?,?> outf;
   private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
+  private Long curBatchMinTxnId;
+  private Long curBatchMaxTxnId;
 
   protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
           throws ConnectionError, StreamingException {
@@ -98,6 +100,12 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     }
   }
 
+  /**
+   * used to tag error msgs to provied some breadcrumbs
+   */
+  String getWatermark() {
+    return partitionPath + " txnIds[" + curBatchMinTxnId + "," + curBatchMaxTxnId + "]";
+  }
   // return the column numbers of the bucketed columns
   private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
     ArrayList<Integer> result =  new ArrayList<Integer>(bucketCols.size());
@@ -164,22 +172,32 @@ public abstract class AbstractRecordWriter implements RecordWriter {
           throws StreamingIOFailure, SerializationError {
     try {
       LOG.debug("Creating Record updater");
+      curBatchMinTxnId = minTxnId;
+      curBatchMaxTxnId = maxTxnID;
       updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID);
     } catch (IOException e) {
-      LOG.error("Failed creating record updater", e);
-      throw new StreamingIOFailure("Unable to get new record Updater", e);
+      String errMsg = "Failed creating RecordUpdaterS for " + getWatermark();
+      LOG.error(errMsg, e);
+      throw new StreamingIOFailure(errMsg, e);
     }
   }
 
   @Override
   public void closeBatch() throws StreamingIOFailure {
-    try {
-      for (RecordUpdater updater : updaters) {
+    boolean haveError = false;
+    for (RecordUpdater updater : updaters) {
+      try {
+        //try not to leave any files open
         updater.close(false);
       }
-      updaters.clear();
-    } catch (IOException e) {
-      throw new StreamingIOFailure("Unable to close recordUpdater", e);
+      catch(Exception ex) {
+        haveError = true;
+        LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex);
+      }
+    }
+    updaters.clear();
+    if(haveError) {
+      throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
index ffa51c9..03f6a44 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
@@ -29,6 +29,7 @@ public class ConnectionError extends StreamingException {
   }
 
   public ConnectionError(HiveEndPoint endPoint, Exception innerEx) {
-    super("Error connecting to " + endPoint, innerEx);
+    super("Error connecting to " + endPoint +
+        (innerEx == null ? "" : ": " + innerEx.getMessage()), innerEx);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
index 4f1154e..394cc54 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
@@ -243,7 +243,7 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
   }
 
   @Override
-  public SerDe getSerde() throws SerializationError {
+  public SerDe getSerde() {
     return serde;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 2f2d44a..4c77842 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -48,6 +48,7 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -503,7 +504,6 @@ public class HiveEndPoint {
 
 
   } // class ConnectionImpl
-
   private static class TransactionBatchImpl implements TransactionBatch {
     private final String username;
     private final UserGroupInformation ugi;
@@ -512,27 +512,28 @@ public class HiveEndPoint {
     private final RecordWriter recordWriter;
     private final List<Long> txnIds;
 
-    private int currentTxnIndex;
+    private int currentTxnIndex = -1;
     private final String partNameForLock;
 
     private TxnState state;
     private LockRequest lockRequest = null;
+    /**
+     * once any operation on this batch encounters a system exception
+     * (e.g. IOException on write) it's safest to assume that we can't write to the
+     * file backing this batch any more.  This guards important public methods
+     */
+    private volatile boolean isClosed = false;
 
     /**
      * Represents a batch of transactions acquired from MetaStore
      *
-     * @param user
-     * @param ugi
-     * @param endPt
-     * @param numTxns
-     * @param msClient
-     * @param recordWriter
      * @throws StreamingException if failed to create new RecordUpdater for batch
      * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
      */
     private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt
               , final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter)
             throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+      boolean success = false;
       try {
         if ( endPt.partitionVals!=null   &&   !endPt.partitionVals.isEmpty() ) {
           Table tableObj = msClient.getTable(endPt.database, endPt.table);
@@ -549,15 +550,18 @@ public class HiveEndPoint {
 
         txnIds = openTxnImpl(msClient, user, numTxns, ugi);
 
-
-        this.currentTxnIndex = -1;
         this.state = TxnState.INACTIVE;
         recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
+        success = true;
       } catch (TException e) {
         throw new TransactionBatchUnAvailable(endPt, e);
       } catch (IOException e) {
         throw new TransactionBatchUnAvailable(endPt, e);
       }
+      finally {
+        //clean up if above throws
+        markDead(success);
+      }
     }
 
     private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi)
@@ -589,6 +593,7 @@ public class HiveEndPoint {
     @Override
     public void beginNextTransaction() throws TransactionError, ImpersonationFailed,
             InterruptedException {
+      checkIsClosed();
       if (ugi==null) {
         beginNextTransactionImpl();
         return;
@@ -610,10 +615,12 @@ public class HiveEndPoint {
     }
 
     private void beginNextTransactionImpl() throws TransactionError {
+      state = TxnState.INACTIVE;//clear state from previous txn
       if ( currentTxnIndex >= txnIds.size() )
         throw new InvalidTrasactionState("No more transactions available in" +
                 " current batch for end point : " + endPt);
       ++currentTxnIndex;
+      state = TxnState.OPEN;
       lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId());
       try {
         LockResponse res = msClient.lock(lockRequest);
@@ -623,8 +630,6 @@ public class HiveEndPoint {
       } catch (TException e) {
         throw new TransactionError("Unable to acquire lock on " + endPt, e);
       }
-
-      state = TxnState.OPEN;
     }
 
     /**
@@ -640,7 +645,7 @@ public class HiveEndPoint {
     }
 
     /**
-     * get state of current tramsaction
+     * get state of current transaction
      * @return
      */
     @Override
@@ -672,26 +677,35 @@ public class HiveEndPoint {
      */
     @Override
     public void write(final byte[] record)
-            throws StreamingException, InterruptedException,
-            ImpersonationFailed {
-      if (ugi==null) {
-        recordWriter.write(getCurrentTxnId(), record);
+            throws StreamingException, InterruptedException {
+      write(Collections.singletonList(record));
+    }
+    private void checkIsClosed() throws IllegalStateException {
+      if(isClosed) {
+        throw new IllegalStateException("TransactionBatch " + toString() + " has been closed()");
+      }
+    }
+    /**
+     * A transaction batch opens a single HDFS file and writes multiple transaction to it.  If there is any issue
+     * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
+     * This ensures that a client can't ignore these failures and continue to write.
+     */
+    private void markDead(boolean success) {
+      if(success) {
         return;
       }
+      isClosed = true;//also ensures that heartbeat() is no-op since client is likely doing it async
       try {
-        ugi.doAs (
-            new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws StreamingException {
-                recordWriter.write(getCurrentTxnId(), record);
-                return null;
-              }
-            }
-        );
-      } catch (IOException e) {
-        throw new ImpersonationFailed("Failed wirting as user '" + username +
-                "' to endPoint :" + endPt + ". Transaction Id: "
-                + getCurrentTxnId(), e);
+        abort(true);//abort all remaining txns
+      }
+      catch(Exception ex) {
+        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+      }
+      try {
+        closeImpl();
+      }
+      catch (Exception ex) {
+        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
       }
     }
 
@@ -707,24 +721,37 @@ public class HiveEndPoint {
     public void write(final Collection<byte[]> records)
             throws StreamingException, InterruptedException,
             ImpersonationFailed {
-      if (ugi==null) {
-        writeImpl(records);
-        return;
-      }
+      checkIsClosed();
+      boolean success = false;
       try {
-        ugi.doAs (
-                new PrivilegedExceptionAction<Void>() {
-                  @Override
-                  public Void run() throws StreamingException {
-                    writeImpl(records);
-                    return null;
-                  }
-                }
-        );
-      } catch (IOException e) {
+        if (ugi == null) {
+          writeImpl(records);
+        } else {
+          ugi.doAs(
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws StreamingException {
+                writeImpl(records);
+                return null;
+              }
+            }
+          );
+        }
+        success = true;
+      } catch(SerializationError ex) {
+        //this exception indicates that a {@code record} could not be parsed and the
+        //caller can decide whether to drop it or send it to dead letter queue.
+        //rolling back the txn and retrying won't help since the tuple will be exactly the same
+        //when it's replayed.
+        success = true;
+        throw ex;
+      } catch(IOException e){
         throw new ImpersonationFailed("Failed writing as user '" + username +
-                "' to endPoint :" + endPt + ". Transaction Id: "
-                + getCurrentTxnId(), e);
+          "' to endPoint :" + endPt + ". Transaction Id: "
+          + getCurrentTxnId(), e);
+      }
+      finally {
+        markDead(success);
       }
     }
 
@@ -746,25 +773,31 @@ public class HiveEndPoint {
     @Override
     public void commit()  throws TransactionError, StreamingException,
            ImpersonationFailed, InterruptedException {
-      if (ugi==null) {
-        commitImpl();
-        return;
-      }
+      checkIsClosed();
+      boolean success = false;
       try {
-        ugi.doAs (
-              new PrivilegedExceptionAction<Void>() {
-                @Override
-                public Void run() throws StreamingException {
-                  commitImpl();
-                  return null;
-                }
+        if (ugi == null) {
+          commitImpl();
+        }
+        else {
+          ugi.doAs(
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws StreamingException {
+                commitImpl();
+                return null;
               }
-        );
+            }
+          );
+        }
+        success = true;
       } catch (IOException e) {
         throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '"
                 + username + "'on endPoint :" + endPt + ". Transaction Id: ", e);
       }
-
+      finally {
+        markDead(success);
+      }
     }
 
     private void commitImpl() throws TransactionError, StreamingException {
@@ -791,8 +824,20 @@ public class HiveEndPoint {
     @Override
     public void abort() throws TransactionError, StreamingException
                       , ImpersonationFailed, InterruptedException {
+      if(isClosed) {
+        /**
+         * isDead is only set internally by this class.  {@link #markDead(boolean)} will abort all
+         * remaining txns, so make this no-op to make sure that a well-behaved client that calls abort()
+         * error doesn't get misleading errors
+         */
+        return;
+      }
+      abort(false);
+    }
+    private void abort(final boolean abortAllRemaining) throws TransactionError, StreamingException
+        , ImpersonationFailed, InterruptedException {
       if (ugi==null) {
-        abortImpl();
+        abortImpl(abortAllRemaining);
         return;
       }
       try {
@@ -800,7 +845,7 @@ public class HiveEndPoint {
                 new PrivilegedExceptionAction<Void>() {
                   @Override
                   public Void run() throws StreamingException {
-                    abortImpl();
+                    abortImpl(abortAllRemaining);
                     return null;
                   }
                 }
@@ -811,11 +856,26 @@ public class HiveEndPoint {
       }
     }
 
-    private void abortImpl() throws TransactionError, StreamingException {
+    private void abortImpl(boolean abortAllRemaining) throws TransactionError, StreamingException {
       try {
-        recordWriter.clear();
-        msClient.rollbackTxn(getCurrentTxnId());
+        if(abortAllRemaining) {
+          //when last txn finished (abort/commit) the currentTxnIndex is pointing at that txn
+          //so we need to start from next one, if any.  Also if batch was created but
+          //fetchTransactionBatch() was never called, we want to start with first txn
+          int minOpenTxnIndex = Math.max(currentTxnIndex +
+            (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
+          for(currentTxnIndex = minOpenTxnIndex;
+              currentTxnIndex < txnIds.size(); currentTxnIndex++) {
+            msClient.rollbackTxn(txnIds.get(currentTxnIndex));
+          }
+        }
+        else {
+          if (getCurrentTxnId() > 0) {
+            msClient.rollbackTxn(getCurrentTxnId());
+          }
+        }
         state = TxnState.ABORTED;
+        recordWriter.clear();
       } catch (NoSuchTxnException e) {
         throw new TransactionError("Unable to abort invalid transaction id : "
                 + getCurrentTxnId(), e);
@@ -827,6 +887,9 @@ public class HiveEndPoint {
 
     @Override
     public void heartbeat() throws StreamingException, HeartBeatFailure {
+      if(isClosed) {
+        return;
+      }
       Long first = txnIds.get(currentTxnIndex);
       Long last = txnIds.get(txnIds.size()-1);
       try {
@@ -840,14 +903,27 @@ public class HiveEndPoint {
       }
     }
 
+    @Override
+    public boolean isClosed() {
+      return isClosed;
+    }
     /**
-     * Close the TransactionBatch
+     * Close the TransactionBatch.  This will abort any still open txns in this batch.
      * @throws StreamingIOFailure I/O failure when closing transaction batch
      */
     @Override
     public void close() throws StreamingException, ImpersonationFailed, InterruptedException {
-      if (ugi==null) {
-        state = TxnState.INACTIVE;
+      if(isClosed) {
+        return;
+      }
+      isClosed = true;
+      abortImpl(true);//abort proactively so that we don't wait for timeout
+      closeImpl();//perhaps we should add a version of RecordWriter.closeBatch(boolean abort) which
+      //will call RecordUpdater.close(boolean abort)
+    }
+    private void closeImpl() throws StreamingException, InterruptedException{
+      state = TxnState.INACTIVE;
+      if(ugi == null) {
         recordWriter.closeBatch();
         return;
       }
@@ -856,7 +932,6 @@ public class HiveEndPoint {
                 new PrivilegedExceptionAction<Void>() {
                   @Override
                   public Void run() throws StreamingException {
-                    state = TxnState.INACTIVE;
                     recordWriter.closeBatch();
                     return null;
                   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
index 28ea7d6..db73d6b 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
@@ -86,7 +86,7 @@ public class StrictJsonWriter extends AbstractRecordWriter {
   }
 
   @Override
-  public SerDe getSerde() throws SerializationError {
+  public SerDe getSerde() {
     return serde;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
index d9a083d..3c8670d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
@@ -100,4 +100,5 @@ public interface TransactionBatch  {
    * @throws InterruptedException if call in interrupted
    */
   public void close() throws StreamingException, InterruptedException;
+  public boolean isClosed();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
index dd9c83d..198d077 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
@@ -20,7 +20,7 @@ package org.apache.hive.hcatalog.streaming;
 
 public class TransactionError extends StreamingException {
   public TransactionError(String msg, Exception e) {
-    super(msg, e);
+    super(msg + (e == null ? "" : ": " + e.getMessage()), e);
   }
 
   public TransactionError(String msg) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 1723ff1..d38cdc0 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -29,11 +29,14 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
@@ -1189,7 +1192,120 @@ public class TestStreaming {
 
 
   }
+  private void runCmdOnDriver(String cmd) throws QueryFailedException {
+    boolean t = runDDL(driver, cmd);
+    Assert.assertTrue(cmd + " failed", t);
+  }
+  
+
+  @Test
+  public void testErrorHandling() throws Exception {
+    runCmdOnDriver("create database testErrors");
+    runCmdOnDriver("use testErrors");
+    runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null);
+    DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt);
+    FaultyWriter writer = new FaultyWriter(innerWriter);
+    StreamingConnection connection = endPt.newConnection(false);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.close();
+    txnBatch.heartbeat();//this is no-op on closed batch
+    txnBatch.abort();//ditto
+    GetOpenTxnsInfoResponse r = msClient.showTxns();
+    Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark());
+    List<TxnInfo> ti = r.getOpen_txns();
+    Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
+    Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+
+    Exception expectedEx = null;
+    try {
+      txnBatch.beginNextTransaction();
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("beginNextTransaction() should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+    expectedEx = null;
+    try {
+      txnBatch.write("name0,1,Hello streaming".getBytes());
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("write()  should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+    expectedEx = null;
+    try {
+      txnBatch.commit();
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("commit() should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+
+    txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.commit();
+    
+    expectedEx = null;
+    txnBatch.beginNextTransaction();
+    writer.enableErrors();
+    try {
+      txnBatch.write("name6,2,Doh!".getBytes());
+    }
+    catch(StreamingIOFailure ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"),
+      expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
+    expectedEx = null;
+    try {
+      txnBatch.commit();
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("commit() should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+
+    r = msClient.showTxns();
+    Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark());
+    ti = r.getOpen_txns();
+    Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
+    Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+    //txnid 3 was committed and thus not open
+    Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState());
+
+    writer.disableErrors();
+    txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    writer.enableErrors();
+    expectedEx = null;
+    try {
+      txnBatch.commit();
+    }
+    catch(StreamingIOFailure ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"),
+      expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
+    
+    r = msClient.showTxns();
+    Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark());
+    ti = r.getOpen_txns();
+    Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
+    Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());
 
+    txnBatch.abort();
+  }
 
     // assumes un partitioned table
   // returns a map<bucketNum, list<record> >
@@ -1411,4 +1527,55 @@ public class TestStreaming {
               " }";
     }
   }
+  /**
+   * This is test-only wrapper around the real RecordWriter.
+   * It can simulate faults from lower levels to test error handling logic.
+   */
+  private static final class FaultyWriter implements RecordWriter {
+    private final RecordWriter delegate;
+    private boolean shouldThrow = false;
+
+    private FaultyWriter(RecordWriter delegate) {
+      assert delegate != null;
+      this.delegate = delegate;
+    }
+    @Override
+    public void write(long transactionId, byte[] record) throws StreamingException {
+      delegate.write(transactionId, record);
+      produceFault();
+    }
+    @Override
+    public void flush() throws StreamingException {
+      delegate.flush();
+      produceFault();
+    }
+    @Override
+    public void clear() throws StreamingException {
+      delegate.clear();
+    }
+    @Override
+    public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException {
+      delegate.newBatch(minTxnId, maxTxnID);
+    }
+    @Override
+    public void closeBatch() throws StreamingException {
+      delegate.closeBatch();
+    }
+
+    /**
+     * allows testing of "unexpected" errors
+     * @throws StreamingIOFailure
+     */
+    private void produceFault() throws StreamingIOFailure {
+      if(shouldThrow) {
+        throw new StreamingIOFailure("Simulated fault occurred");
+      }
+    }
+    void enableErrors() {
+      shouldThrow = true;
+    }
+    void disableErrors() {
+      shouldThrow = false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index ee31c23..9098e84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -268,6 +268,9 @@ public class OrcRecordUpdater implements RecordUpdater {
     item.setFieldValue(ROW_ID, rowId);
   }
 
+  public String toString() {
+    return getClass().getName() + "[" + path +"]";
+  }
   /**
    * To handle multiple INSERT... statements in a single transaction, we want to make sure
    * to generate unique {@code rowId} for all inserted rows of the transaction.