You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/11/30 01:12:20 UTC
[83/91] [abbrv] hive git commit: HIVE-12307 - Streaming API
TransactionBatch.close() must abort any remaining transactions in the
batch(Eugene Koifman, reviewed by Alan Gates)
HIVE-12307 - Streaming API TransactionBatch.close() must abort any remaining transactions in the batch(Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f1ac5a39
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f1ac5a39
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f1ac5a39
Branch: refs/heads/spark
Commit: f1ac5a391a18fccf724249038fca73e7b55854e2
Parents: 6d4dfa4
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Nov 26 11:48:03 2015 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Nov 26 11:48:29 2015 -0800
----------------------------------------------------------------------
.../streaming/AbstractRecordWriter.java | 32 ++-
.../hcatalog/streaming/ConnectionError.java | 3 +-
.../streaming/DelimitedInputWriter.java | 2 +-
.../hive/hcatalog/streaming/HiveEndPoint.java | 211 +++++++++++++------
.../hcatalog/streaming/StrictJsonWriter.java | 2 +-
.../hcatalog/streaming/TransactionBatch.java | 1 +
.../hcatalog/streaming/TransactionError.java | 2 +-
.../hive/hcatalog/streaming/TestStreaming.java | 167 +++++++++++++++
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 3 +
9 files changed, 344 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index 5c15675..0c6b9ea 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -65,6 +65,8 @@ public abstract class AbstractRecordWriter implements RecordWriter {
final AcidOutputFormat<?,?> outf;
private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
+ private Long curBatchMinTxnId;
+ private Long curBatchMaxTxnId;
protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
throws ConnectionError, StreamingException {
@@ -98,6 +100,12 @@ public abstract class AbstractRecordWriter implements RecordWriter {
}
}
+ /**
+ * used to tag error msgs to provied some breadcrumbs
+ */
+ String getWatermark() {
+ return partitionPath + " txnIds[" + curBatchMinTxnId + "," + curBatchMaxTxnId + "]";
+ }
// return the column numbers of the bucketed columns
private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
ArrayList<Integer> result = new ArrayList<Integer>(bucketCols.size());
@@ -164,22 +172,32 @@ public abstract class AbstractRecordWriter implements RecordWriter {
throws StreamingIOFailure, SerializationError {
try {
LOG.debug("Creating Record updater");
+ curBatchMinTxnId = minTxnId;
+ curBatchMaxTxnId = maxTxnID;
updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID);
} catch (IOException e) {
- LOG.error("Failed creating record updater", e);
- throw new StreamingIOFailure("Unable to get new record Updater", e);
+ String errMsg = "Failed creating RecordUpdaterS for " + getWatermark();
+ LOG.error(errMsg, e);
+ throw new StreamingIOFailure(errMsg, e);
}
}
@Override
public void closeBatch() throws StreamingIOFailure {
- try {
- for (RecordUpdater updater : updaters) {
+ boolean haveError = false;
+ for (RecordUpdater updater : updaters) {
+ try {
+ //try not to leave any files open
updater.close(false);
}
- updaters.clear();
- } catch (IOException e) {
- throw new StreamingIOFailure("Unable to close recordUpdater", e);
+ catch(Exception ex) {
+ haveError = true;
+ LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex);
+ }
+ }
+ updaters.clear();
+ if(haveError) {
+ throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
index ffa51c9..03f6a44 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
@@ -29,6 +29,7 @@ public class ConnectionError extends StreamingException {
}
public ConnectionError(HiveEndPoint endPoint, Exception innerEx) {
- super("Error connecting to " + endPoint, innerEx);
+ super("Error connecting to " + endPoint +
+ (innerEx == null ? "" : ": " + innerEx.getMessage()), innerEx);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
index 4f1154e..394cc54 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
@@ -243,7 +243,7 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
}
@Override
- public SerDe getSerde() throws SerializationError {
+ public SerDe getSerde() {
return serde;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 2f2d44a..4c77842 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -48,6 +48,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -503,7 +504,6 @@ public class HiveEndPoint {
} // class ConnectionImpl
-
private static class TransactionBatchImpl implements TransactionBatch {
private final String username;
private final UserGroupInformation ugi;
@@ -512,27 +512,28 @@ public class HiveEndPoint {
private final RecordWriter recordWriter;
private final List<Long> txnIds;
- private int currentTxnIndex;
+ private int currentTxnIndex = -1;
private final String partNameForLock;
private TxnState state;
private LockRequest lockRequest = null;
+ /**
+ * once any operation on this batch encounters a system exception
+ * (e.g. IOException on write) it's safest to assume that we can't write to the
+ * file backing this batch any more. This guards important public methods
+ */
+ private volatile boolean isClosed = false;
/**
* Represents a batch of transactions acquired from MetaStore
*
- * @param user
- * @param ugi
- * @param endPt
- * @param numTxns
- * @param msClient
- * @param recordWriter
* @throws StreamingException if failed to create new RecordUpdater for batch
* @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
*/
private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt
, final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter)
throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+ boolean success = false;
try {
if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) {
Table tableObj = msClient.getTable(endPt.database, endPt.table);
@@ -549,15 +550,18 @@ public class HiveEndPoint {
txnIds = openTxnImpl(msClient, user, numTxns, ugi);
-
- this.currentTxnIndex = -1;
this.state = TxnState.INACTIVE;
recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
+ success = true;
} catch (TException e) {
throw new TransactionBatchUnAvailable(endPt, e);
} catch (IOException e) {
throw new TransactionBatchUnAvailable(endPt, e);
}
+ finally {
+ //clean up if above throws
+ markDead(success);
+ }
}
private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi)
@@ -589,6 +593,7 @@ public class HiveEndPoint {
@Override
public void beginNextTransaction() throws TransactionError, ImpersonationFailed,
InterruptedException {
+ checkIsClosed();
if (ugi==null) {
beginNextTransactionImpl();
return;
@@ -610,10 +615,12 @@ public class HiveEndPoint {
}
private void beginNextTransactionImpl() throws TransactionError {
+ state = TxnState.INACTIVE;//clear state from previous txn
if ( currentTxnIndex >= txnIds.size() )
throw new InvalidTrasactionState("No more transactions available in" +
" current batch for end point : " + endPt);
++currentTxnIndex;
+ state = TxnState.OPEN;
lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId());
try {
LockResponse res = msClient.lock(lockRequest);
@@ -623,8 +630,6 @@ public class HiveEndPoint {
} catch (TException e) {
throw new TransactionError("Unable to acquire lock on " + endPt, e);
}
-
- state = TxnState.OPEN;
}
/**
@@ -640,7 +645,7 @@ public class HiveEndPoint {
}
/**
- * get state of current tramsaction
+ * get state of current transaction
* @return
*/
@Override
@@ -672,26 +677,35 @@ public class HiveEndPoint {
*/
@Override
public void write(final byte[] record)
- throws StreamingException, InterruptedException,
- ImpersonationFailed {
- if (ugi==null) {
- recordWriter.write(getCurrentTxnId(), record);
+ throws StreamingException, InterruptedException {
+ write(Collections.singletonList(record));
+ }
+ private void checkIsClosed() throws IllegalStateException {
+ if(isClosed) {
+ throw new IllegalStateException("TransactionBatch " + toString() + " has been closed()");
+ }
+ }
+ /**
+ * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue
+ * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
+ * This ensures that a client can't ignore these failures and continue to write.
+ */
+ private void markDead(boolean success) {
+ if(success) {
return;
}
+ isClosed = true;//also ensures that heartbeat() is no-op since client is likely doing it async
try {
- ugi.doAs (
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws StreamingException {
- recordWriter.write(getCurrentTxnId(), record);
- return null;
- }
- }
- );
- } catch (IOException e) {
- throw new ImpersonationFailed("Failed wirting as user '" + username +
- "' to endPoint :" + endPt + ". Transaction Id: "
- + getCurrentTxnId(), e);
+ abort(true);//abort all remaining txns
+ }
+ catch(Exception ex) {
+ LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+ }
+ try {
+ closeImpl();
+ }
+ catch (Exception ex) {
+ LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
}
}
@@ -707,24 +721,37 @@ public class HiveEndPoint {
public void write(final Collection<byte[]> records)
throws StreamingException, InterruptedException,
ImpersonationFailed {
- if (ugi==null) {
- writeImpl(records);
- return;
- }
+ checkIsClosed();
+ boolean success = false;
try {
- ugi.doAs (
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws StreamingException {
- writeImpl(records);
- return null;
- }
- }
- );
- } catch (IOException e) {
+ if (ugi == null) {
+ writeImpl(records);
+ } else {
+ ugi.doAs(
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ writeImpl(records);
+ return null;
+ }
+ }
+ );
+ }
+ success = true;
+ } catch(SerializationError ex) {
+ //this exception indicates that a {@code record} could not be parsed and the
+ //caller can decide whether to drop it or send it to dead letter queue.
+ //rolling back the txn and retrying won't help since the tuple will be exactly the same
+ //when it's replayed.
+ success = true;
+ throw ex;
+ } catch(IOException e){
throw new ImpersonationFailed("Failed writing as user '" + username +
- "' to endPoint :" + endPt + ". Transaction Id: "
- + getCurrentTxnId(), e);
+ "' to endPoint :" + endPt + ". Transaction Id: "
+ + getCurrentTxnId(), e);
+ }
+ finally {
+ markDead(success);
}
}
@@ -746,25 +773,31 @@ public class HiveEndPoint {
@Override
public void commit() throws TransactionError, StreamingException,
ImpersonationFailed, InterruptedException {
- if (ugi==null) {
- commitImpl();
- return;
- }
+ checkIsClosed();
+ boolean success = false;
try {
- ugi.doAs (
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws StreamingException {
- commitImpl();
- return null;
- }
+ if (ugi == null) {
+ commitImpl();
+ }
+ else {
+ ugi.doAs(
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws StreamingException {
+ commitImpl();
+ return null;
}
- );
+ }
+ );
+ }
+ success = true;
} catch (IOException e) {
throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '"
+ username + "'on endPoint :" + endPt + ". Transaction Id: ", e);
}
-
+ finally {
+ markDead(success);
+ }
}
private void commitImpl() throws TransactionError, StreamingException {
@@ -791,8 +824,20 @@ public class HiveEndPoint {
@Override
public void abort() throws TransactionError, StreamingException
, ImpersonationFailed, InterruptedException {
+ if(isClosed) {
+ /**
+ * isDead is only set internally by this class. {@link #markDead(boolean)} will abort all
+ * remaining txns, so make this no-op to make sure that a well-behaved client that calls abort()
+ * error doesn't get misleading errors
+ */
+ return;
+ }
+ abort(false);
+ }
+ private void abort(final boolean abortAllRemaining) throws TransactionError, StreamingException
+ , ImpersonationFailed, InterruptedException {
if (ugi==null) {
- abortImpl();
+ abortImpl(abortAllRemaining);
return;
}
try {
@@ -800,7 +845,7 @@ public class HiveEndPoint {
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws StreamingException {
- abortImpl();
+ abortImpl(abortAllRemaining);
return null;
}
}
@@ -811,11 +856,26 @@ public class HiveEndPoint {
}
}
- private void abortImpl() throws TransactionError, StreamingException {
+ private void abortImpl(boolean abortAllRemaining) throws TransactionError, StreamingException {
try {
- recordWriter.clear();
- msClient.rollbackTxn(getCurrentTxnId());
+ if(abortAllRemaining) {
+ //when last txn finished (abort/commit) the currentTxnIndex is pointing at that txn
+ //so we need to start from next one, if any. Also if batch was created but
+ //fetchTransactionBatch() was never called, we want to start with first txn
+ int minOpenTxnIndex = Math.max(currentTxnIndex +
+ (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
+ for(currentTxnIndex = minOpenTxnIndex;
+ currentTxnIndex < txnIds.size(); currentTxnIndex++) {
+ msClient.rollbackTxn(txnIds.get(currentTxnIndex));
+ }
+ }
+ else {
+ if (getCurrentTxnId() > 0) {
+ msClient.rollbackTxn(getCurrentTxnId());
+ }
+ }
state = TxnState.ABORTED;
+ recordWriter.clear();
} catch (NoSuchTxnException e) {
throw new TransactionError("Unable to abort invalid transaction id : "
+ getCurrentTxnId(), e);
@@ -827,6 +887,9 @@ public class HiveEndPoint {
@Override
public void heartbeat() throws StreamingException, HeartBeatFailure {
+ if(isClosed) {
+ return;
+ }
Long first = txnIds.get(currentTxnIndex);
Long last = txnIds.get(txnIds.size()-1);
try {
@@ -840,14 +903,27 @@ public class HiveEndPoint {
}
}
+ @Override
+ public boolean isClosed() {
+ return isClosed;
+ }
/**
- * Close the TransactionBatch
+ * Close the TransactionBatch. This will abort any still open txns in this batch.
* @throws StreamingIOFailure I/O failure when closing transaction batch
*/
@Override
public void close() throws StreamingException, ImpersonationFailed, InterruptedException {
- if (ugi==null) {
- state = TxnState.INACTIVE;
+ if(isClosed) {
+ return;
+ }
+ isClosed = true;
+ abortImpl(true);//abort proactively so that we don't wait for timeout
+ closeImpl();//perhaps we should add a version of RecordWriter.closeBatch(boolean abort) which
+ //will call RecordUpdater.close(boolean abort)
+ }
+ private void closeImpl() throws StreamingException, InterruptedException{
+ state = TxnState.INACTIVE;
+ if(ugi == null) {
recordWriter.closeBatch();
return;
}
@@ -856,7 +932,6 @@ public class HiveEndPoint {
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws StreamingException {
- state = TxnState.INACTIVE;
recordWriter.closeBatch();
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
index 28ea7d6..db73d6b 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
@@ -86,7 +86,7 @@ public class StrictJsonWriter extends AbstractRecordWriter {
}
@Override
- public SerDe getSerde() throws SerializationError {
+ public SerDe getSerde() {
return serde;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
index d9a083d..3c8670d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
@@ -100,4 +100,5 @@ public interface TransactionBatch {
* @throws InterruptedException if call in interrupted
*/
public void close() throws StreamingException, InterruptedException;
+ public boolean isClosed();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
index dd9c83d..198d077 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java
@@ -20,7 +20,7 @@ package org.apache.hive.hcatalog.streaming;
public class TransactionError extends StreamingException {
public TransactionError(String msg, Exception e) {
- super(msg, e);
+ super(msg + (e == null ? "" : ": " + e.getMessage()), e);
}
public TransactionError(String msg) {
http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 1723ff1..d38cdc0 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -29,11 +29,14 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
@@ -1189,7 +1192,120 @@ public class TestStreaming {
}
+ private void runCmdOnDriver(String cmd) throws QueryFailedException {
+ boolean t = runDDL(driver, cmd);
+ Assert.assertTrue(cmd + " failed", t);
+ }
+
+
+ @Test
+ public void testErrorHandling() throws Exception {
+ runCmdOnDriver("create database testErrors");
+ runCmdOnDriver("use testErrors");
+ runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null);
+ DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt);
+ FaultyWriter writer = new FaultyWriter(innerWriter);
+ StreamingConnection connection = endPt.newConnection(false);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.close();
+ txnBatch.heartbeat();//this is no-op on closed batch
+ txnBatch.abort();//ditto
+ GetOpenTxnsInfoResponse r = msClient.showTxns();
+ Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark());
+ List<TxnInfo> ti = r.getOpen_txns();
+ Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
+ Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+
+ Exception expectedEx = null;
+ try {
+ txnBatch.beginNextTransaction();
+ }
+ catch(IllegalStateException ex) {
+ expectedEx = ex;
+ }
+ Assert.assertTrue("beginNextTransaction() should have failed",
+ expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+ expectedEx = null;
+ try {
+ txnBatch.write("name0,1,Hello streaming".getBytes());
+ }
+ catch(IllegalStateException ex) {
+ expectedEx = ex;
+ }
+ Assert.assertTrue("write() should have failed",
+ expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+ expectedEx = null;
+ try {
+ txnBatch.commit();
+ }
+ catch(IllegalStateException ex) {
+ expectedEx = ex;
+ }
+ Assert.assertTrue("commit() should have failed",
+ expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+
+ txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name2,2,Welcome to streaming".getBytes());
+ txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+ txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+ txnBatch.commit();
+
+ expectedEx = null;
+ txnBatch.beginNextTransaction();
+ writer.enableErrors();
+ try {
+ txnBatch.write("name6,2,Doh!".getBytes());
+ }
+ catch(StreamingIOFailure ex) {
+ expectedEx = ex;
+ }
+ Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"),
+ expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
+ expectedEx = null;
+ try {
+ txnBatch.commit();
+ }
+ catch(IllegalStateException ex) {
+ expectedEx = ex;
+ }
+ Assert.assertTrue("commit() should have failed",
+ expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+
+ r = msClient.showTxns();
+ Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark());
+ ti = r.getOpen_txns();
+ Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
+ Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+ //txnid 3 was committed and thus not open
+ Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState());
+
+ writer.disableErrors();
+ txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name2,2,Welcome to streaming".getBytes());
+ writer.enableErrors();
+ expectedEx = null;
+ try {
+ txnBatch.commit();
+ }
+ catch(StreamingIOFailure ex) {
+ expectedEx = ex;
+ }
+ Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"),
+ expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
+
+ r = msClient.showTxns();
+ Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark());
+ ti = r.getOpen_txns();
+ Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
+ Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());
+ txnBatch.abort();
+ }
// assumes un partitioned table
// returns a map<bucketNum, list<record> >
@@ -1411,4 +1527,55 @@ public class TestStreaming {
" }";
}
}
+ /**
+ * This is test-only wrapper around the real RecordWriter.
+ * It can simulate faults from lower levels to test error handling logic.
+ */
+ private static final class FaultyWriter implements RecordWriter {
+ private final RecordWriter delegate;
+ private boolean shouldThrow = false;
+
+ private FaultyWriter(RecordWriter delegate) {
+ assert delegate != null;
+ this.delegate = delegate;
+ }
+ @Override
+ public void write(long transactionId, byte[] record) throws StreamingException {
+ delegate.write(transactionId, record);
+ produceFault();
+ }
+ @Override
+ public void flush() throws StreamingException {
+ delegate.flush();
+ produceFault();
+ }
+ @Override
+ public void clear() throws StreamingException {
+ delegate.clear();
+ }
+ @Override
+ public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException {
+ delegate.newBatch(minTxnId, maxTxnID);
+ }
+ @Override
+ public void closeBatch() throws StreamingException {
+ delegate.closeBatch();
+ }
+
+ /**
+ * allows testing of "unexpected" errors
+ * @throws StreamingIOFailure
+ */
+ private void produceFault() throws StreamingIOFailure {
+ if(shouldThrow) {
+ throw new StreamingIOFailure("Simulated fault occurred");
+ }
+ }
+ void enableErrors() {
+ shouldThrow = true;
+ }
+ void disableErrors() {
+ shouldThrow = false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f1ac5a39/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index ee31c23..9098e84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -268,6 +268,9 @@ public class OrcRecordUpdater implements RecordUpdater {
item.setFieldValue(ROW_ID, rowId);
}
+ public String toString() {
+ return getClass().getName() + "[" + path +"]";
+ }
/**
* To handle multiple INSERT... statements in a single transaction, we want to make sure
* to generate unique {@code rowId} for all inserted rows of the transaction.