You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/07/16 09:52:43 UTC
[25/50] [abbrv] hive git commit: HIVE-11228 - Mutation API should use
semi-shared locks. (Elliot West, via Eugene Koifman)
HIVE-11228 - Mutation API should use semi-shared locks. (Elliot West, via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3301b92b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3301b92b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3301b92b
Branch: refs/heads/parquet
Commit: 3301b92bcb2a1f779e76d174cd9ac6d83fc66938
Parents: 17f759d
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Jul 13 09:42:07 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Jul 13 09:42:26 2015 -0700
----------------------------------------------------------------------
.../streaming/mutate/client/MutatorClient.java | 11 +-
.../streaming/mutate/client/lock/Lock.java | 73 +++++++----
.../hive/hcatalog/streaming/mutate/package.html | 8 +-
.../streaming/mutate/client/lock/TestLock.java | 121 ++++++++++++-------
4 files changed, 136 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
index 2724525..29b828d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
@@ -42,7 +42,16 @@ public class MutatorClient implements Closeable {
.lockFailureListener(lockFailureListener == null ? LockFailureListener.NULL_LISTENER : lockFailureListener)
.user(user);
for (AcidTable table : tables) {
- lockOptions.addTable(table.getDatabaseName(), table.getTableName());
+ switch (table.getTableType()) {
+ case SOURCE:
+ lockOptions.addSourceTable(table.getDatabaseName(), table.getTableName());
+ break;
+ case SINK:
+ lockOptions.addSinkTable(table.getDatabaseName(), table.getTableName());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown TableType: " + table.getTableType());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
index 21604df..ad0b303 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
@@ -2,6 +2,7 @@ package org.apache.hive.hcatalog.streaming.mutate.client.lock;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@@ -35,7 +36,8 @@ public class Lock {
private final IMetaStoreClient metaStoreClient;
private final HeartbeatFactory heartbeatFactory;
private final LockFailureListener listener;
- private final Collection<Table> tableDescriptors;
+ private final Collection<Table> sinks;
+ private final Collection<Table> tables = new HashSet<>();
private final int lockRetries;
private final int retryWaitSeconds;
private final String user;
@@ -46,23 +48,26 @@ public class Lock {
private Long transactionId;
public Lock(IMetaStoreClient metaStoreClient, Options options) {
- this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, options.listener, options.user,
- options.descriptors, options.lockRetries, options.retryWaitSeconds);
+ this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, options.listener, options.user, options.sources,
+ options.sinks, options.lockRetries, options.retryWaitSeconds);
}
/** Visible for testing only. */
Lock(IMetaStoreClient metaStoreClient, HeartbeatFactory heartbeatFactory, HiveConf hiveConf,
- LockFailureListener listener, String user, Collection<Table> tableDescriptors, int lockRetries,
+ LockFailureListener listener, String user, Collection<Table> sources, Collection<Table> sinks, int lockRetries,
int retryWaitSeconds) {
this.metaStoreClient = metaStoreClient;
this.heartbeatFactory = heartbeatFactory;
this.hiveConf = hiveConf;
this.user = user;
- this.tableDescriptors = tableDescriptors;
this.listener = listener;
this.lockRetries = lockRetries;
this.retryWaitSeconds = retryWaitSeconds;
+ this.sinks = sinks;
+ tables.addAll(sources);
+ tables.addAll(sinks);
+
if (LockFailureListener.NULL_LISTENER.equals(listener)) {
LOG.warn("No {} supplied. Data quality and availability cannot be assured.",
LockFailureListener.class.getSimpleName());
@@ -77,6 +82,9 @@ public class Lock {
/** Attempts to acquire a read lock on the table, returns if successful, throws exception otherwise. */
public void acquire(long transactionId) throws LockException {
+ if (transactionId <= 0) {
+ throw new IllegalArgumentException("Invalid transaction id: " + transactionId);
+ }
lockId = internalAcquire(transactionId);
this.transactionId = transactionId;
initiateHeartbeat();
@@ -96,19 +104,18 @@ public class Lock {
@Override
public String toString() {
- return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + ", transactionId=" + transactionId
- + "]";
+ return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + ", transactionId=" + transactionId + "]";
}
private long internalAcquire(Long transactionId) throws LockException {
int attempts = 0;
- LockRequest request = buildSharedLockRequest(transactionId);
+ LockRequest request = buildLockRequest(transactionId);
do {
LockResponse response = null;
try {
response = metaStoreClient.lock(request);
} catch (TException e) {
- throw new LockException("Unable to acquire lock for tables: [" + join(tableDescriptors) + "]", e);
+ throw new LockException("Unable to acquire lock for tables: [" + join(tables) + "]", e);
}
if (response != null) {
LockState state = response.getState();
@@ -129,7 +136,7 @@ public class Lock {
}
attempts++;
} while (attempts < lockRetries);
- throw new LockException("Could not acquire lock on tables: [" + join(tableDescriptors) + "]");
+ throw new LockException("Could not acquire lock on tables: [" + join(tables) + "]");
}
private void internalRelease() {
@@ -142,18 +149,24 @@ public class Lock {
}
} catch (TException e) {
LOG.error("Lock " + lockId + " failed.", e);
- listener.lockFailed(lockId, transactionId, asStrings(tableDescriptors), e);
+ listener.lockFailed(lockId, transactionId, asStrings(tables), e);
}
}
- private LockRequest buildSharedLockRequest(Long transactionId) {
+ private LockRequest buildLockRequest(Long transactionId) {
+ if (transactionId == null && !sinks.isEmpty()) {
+ throw new IllegalArgumentException("Cannot sink to tables outside of a transaction: sinks=" + asStrings(sinks));
+ }
LockRequestBuilder requestBuilder = new LockRequestBuilder();
- for (Table descriptor : tableDescriptors) {
- LockComponent component = new LockComponentBuilder()
- .setDbName(descriptor.getDbName())
- .setTableName(descriptor.getTableName())
- .setShared()
- .build();
+ for (Table table : tables) {
+ LockComponentBuilder componentBuilder = new LockComponentBuilder().setDbName(table.getDbName()).setTableName(
+ table.getTableName());
+ if (sinks.contains(table)) {
+ componentBuilder.setSemiShared();
+ } else {
+ componentBuilder.setShared();
+ }
+ LockComponent component = componentBuilder.build();
requestBuilder.addLockComponent(component);
}
if (transactionId != null) {
@@ -166,8 +179,7 @@ public class Lock {
private void initiateHeartbeat() {
int heartbeatPeriod = getHeartbeatPeriod();
LOG.debug("Heartbeat period {}s", heartbeatPeriod);
- heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, transactionId, tableDescriptors, lockId,
- heartbeatPeriod);
+ heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, transactionId, tables, lockId, heartbeatPeriod);
}
private int getHeartbeatPeriod() {
@@ -210,22 +222,33 @@ public class Lock {
/** Constructs a lock options for a set of Hive ACID tables from which we wish to read. */
public static final class Options {
- Set<Table> descriptors = new LinkedHashSet<>();
+ Set<Table> sources = new LinkedHashSet<>();
+ Set<Table> sinks = new LinkedHashSet<>();
LockFailureListener listener = LockFailureListener.NULL_LISTENER;
int lockRetries = 5;
int retryWaitSeconds = 30;
String user;
HiveConf hiveConf;
- /** Adds a table for which a shared read lock will be requested. */
- public Options addTable(String databaseName, String tableName) {
+ /** Adds a table for which a shared lock will be requested. */
+ public Options addSourceTable(String databaseName, String tableName) {
+ addTable(databaseName, tableName, sources);
+ return this;
+ }
+
+ /** Adds a table for which a semi-shared lock will be requested. */
+ public Options addSinkTable(String databaseName, String tableName) {
+ addTable(databaseName, tableName, sinks);
+ return this;
+ }
+
+ private void addTable(String databaseName, String tableName, Set<Table> tables) {
checkNotNullOrEmpty(databaseName);
checkNotNullOrEmpty(tableName);
Table table = new Table();
table.setDbName(databaseName);
table.setTableName(tableName);
- descriptors.add(table);
- return this;
+ tables.add(table);
}
public Options user(String user) {
http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
index 9fc10b6..09a55b6 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
@@ -421,7 +421,7 @@ automatically (say on a hourly basis). In such cases requiring the Hive
admin to pre-create the necessary partitions may not be reasonable.
Consequently the API allows coordinators to create partitions as needed
(see:
-<code>MutatorClientBuilder.addTable(String, String, boolean)</code>
+<code>MutatorClientBuilder.addSinkTable(String, String, boolean)</code>
). Partition creation being an atomic action, multiple coordinators can
race to create the partition, but only one would succeed, so
coordinators clients need not synchronize when creating a partition. The
@@ -440,14 +440,14 @@ consistent manner requires the following:
<ol>
<li>Obtaining a valid transaction list from the meta store (<code>ValidTxnList</code>).
</li>
-<li>Acquiring a read-lock with the meta store and issuing
-heartbeats (<code>LockImpl</code> can help with this).
+<li>Acquiring a lock with the meta store and issuing heartbeats (<code>LockImpl</code>
+can help with this).
</li>
<li>Configuring the <code>OrcInputFormat</code> and then reading
the data. Make sure that you also pull in the <code>ROW__ID</code>
values. See: <code>AcidRecordReader.getRecordIdentifier</code>.
</li>
-<li>Releasing the read-lock.</li>
+<li>Releasing the lock.</li>
</ol>
</p>
http://git-wip-us.apache.org/repos/asf/hive/blob/3301b92b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
index ef1e80c..05f342b 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
@@ -19,7 +19,9 @@ import static org.mockito.Mockito.when;
import java.net.InetAddress;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.Timer;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,14 +44,17 @@ import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
@RunWith(MockitoJUnitRunner.class)
public class TestLock {
- private static final Table TABLE_1 = createTable("DB", "ONE");
- private static final Table TABLE_2 = createTable("DB", "TWO");
- private static final List<Table> TABLES = ImmutableList.of(TABLE_1, TABLE_2);
+ private static final Table SOURCE_TABLE_1 = createTable("DB", "SOURCE_1");
+ private static final Table SOURCE_TABLE_2 = createTable("DB", "SOURCE_2");
+ private static final Table SINK_TABLE = createTable("DB", "SINK");
+ private static final Set<Table> SOURCES = ImmutableSet.of(SOURCE_TABLE_1, SOURCE_TABLE_2);
+ private static final Set<Table> SINKS = ImmutableSet.of(SINK_TABLE);
+ private static final Set<Table> TABLES = ImmutableSet.of(SOURCE_TABLE_1, SOURCE_TABLE_2, SINK_TABLE);
private static final long LOCK_ID = 42;
private static final long TRANSACTION_ID = 109;
private static final String USER = "ewest";
@@ -67,7 +72,8 @@ public class TestLock {
@Captor
private ArgumentCaptor<LockRequest> requestCaptor;
- private Lock lock;
+ private Lock readLock;
+ private Lock writeLock;
private HiveConf configuration = new HiveConf();
@Before
@@ -79,44 +85,57 @@ public class TestLock {
mockHeartbeatFactory.newInstance(any(IMetaStoreClient.class), any(LockFailureListener.class), any(Long.class),
any(Collection.class), anyLong(), anyInt())).thenReturn(mockHeartbeat);
- lock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, TABLES, 3, 0);
+ readLock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, SOURCES,
+ Collections.<Table> emptySet(), 3, 0);
+ writeLock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, SOURCES, SINKS,
+ 3, 0);
}
@Test
public void testAcquireReadLockWithNoIssues() throws Exception {
- lock.acquire();
- assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
- assertNull(lock.getTransactionId());
+ readLock.acquire();
+ assertEquals(Long.valueOf(LOCK_ID), readLock.getLockId());
+ assertNull(readLock.getTransactionId());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAcquireWriteLockWithoutTxn() throws Exception {
+ writeLock.acquire();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAcquireWriteLockWithInvalidTxn() throws Exception {
+ writeLock.acquire(0);
}
@Test
public void testAcquireTxnLockWithNoIssues() throws Exception {
- lock.acquire(TRANSACTION_ID);
- assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
- assertEquals(Long.valueOf(TRANSACTION_ID), lock.getTransactionId());
+ writeLock.acquire(TRANSACTION_ID);
+ assertEquals(Long.valueOf(LOCK_ID), writeLock.getLockId());
+ assertEquals(Long.valueOf(TRANSACTION_ID), writeLock.getTransactionId());
}
@Test
public void testAcquireReadLockCheckHeartbeatCreated() throws Exception {
configuration.set("hive.txn.timeout", "100s");
- lock.acquire();
+ readLock.acquire();
- verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), any(Long.class), eq(TABLES),
+ verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), any(Long.class), eq(SOURCES),
eq(LOCK_ID), eq(75));
}
@Test
public void testAcquireTxnLockCheckHeartbeatCreated() throws Exception {
configuration.set("hive.txn.timeout", "100s");
- lock.acquire(TRANSACTION_ID);
+ writeLock.acquire(TRANSACTION_ID);
- verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), eq(TRANSACTION_ID), eq(TABLES),
- eq(LOCK_ID), eq(75));
+ verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), eq(TRANSACTION_ID),
+ eq(TABLES), eq(LOCK_ID), eq(75));
}
@Test
public void testAcquireLockCheckUser() throws Exception {
- lock.acquire();
+ readLock.acquire();
verify(mockMetaStoreClient).lock(requestCaptor.capture());
LockRequest actualRequest = requestCaptor.getValue();
assertEquals(USER, actualRequest.getUser());
@@ -124,7 +143,7 @@ public class TestLock {
@Test
public void testAcquireReadLockCheckLocks() throws Exception {
- lock.acquire();
+ readLock.acquire();
verify(mockMetaStoreClient).lock(requestCaptor.capture());
LockRequest request = requestCaptor.getValue();
@@ -137,17 +156,17 @@ public class TestLock {
assertEquals(2, components.size());
LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
- expected1.setTablename("ONE");
+ expected1.setTablename("SOURCE_1");
assertTrue(components.contains(expected1));
LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
- expected2.setTablename("TWO");
+ expected2.setTablename("SOURCE_2");
assertTrue(components.contains(expected2));
}
@Test
public void testAcquireTxnLockCheckLocks() throws Exception {
- lock.acquire(TRANSACTION_ID);
+ writeLock.acquire(TRANSACTION_ID);
verify(mockMetaStoreClient).lock(requestCaptor.capture());
LockRequest request = requestCaptor.getValue();
@@ -157,73 +176,77 @@ public class TestLock {
List<LockComponent> components = request.getComponent();
- System.out.println(components);
- assertEquals(2, components.size());
+ assertEquals(3, components.size());
LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
- expected1.setTablename("ONE");
+ expected1.setTablename("SOURCE_1");
assertTrue(components.contains(expected1));
LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
- expected2.setTablename("TWO");
+ expected2.setTablename("SOURCE_2");
assertTrue(components.contains(expected2));
+
+ LockComponent expected3 = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "DB");
+ expected3.setTablename("SINK");
+ assertTrue(components.contains(expected3));
}
@Test(expected = LockException.class)
public void testAcquireLockNotAcquired() throws Exception {
when(mockLockResponse.getState()).thenReturn(NOT_ACQUIRED);
- lock.acquire();
+ readLock.acquire();
}
@Test(expected = LockException.class)
public void testAcquireLockAborted() throws Exception {
when(mockLockResponse.getState()).thenReturn(ABORT);
- lock.acquire();
+ readLock.acquire();
}
@Test(expected = LockException.class)
public void testAcquireLockWithWaitRetriesExceeded() throws Exception {
when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, WAITING);
- lock.acquire();
+ readLock.acquire();
}
@Test
public void testAcquireLockWithWaitRetries() throws Exception {
when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, ACQUIRED);
- lock.acquire();
- assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+ readLock.acquire();
+ assertEquals(Long.valueOf(LOCK_ID), readLock.getLockId());
}
@Test
public void testReleaseLock() throws Exception {
- lock.acquire();
- lock.release();
+ readLock.acquire();
+ readLock.release();
verify(mockMetaStoreClient).unlock(LOCK_ID);
}
@Test
public void testReleaseLockNoLock() throws Exception {
- lock.release();
+ readLock.release();
verifyNoMoreInteractions(mockMetaStoreClient);
}
@Test
public void testReleaseLockCancelsHeartbeat() throws Exception {
- lock.acquire();
- lock.release();
+ readLock.acquire();
+ readLock.release();
verify(mockHeartbeat).cancel();
}
@Test
public void testReadHeartbeat() throws Exception {
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, SOURCES, LOCK_ID);
task.run();
verify(mockMetaStoreClient).heartbeat(0, LOCK_ID);
}
@Test
public void testTxnHeartbeat() throws Exception {
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES,
+ LOCK_ID);
task.run();
verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
}
@@ -232,43 +255,47 @@ public class TestLock {
public void testReadHeartbeatFailsNoSuchLockException() throws Exception {
Throwable t = new NoSuchLockException();
doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, SOURCES, LOCK_ID);
task.run();
- verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(TABLES), t);
+ verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(SOURCES), t);
}
@Test
public void testTxnHeartbeatFailsNoSuchLockException() throws Exception {
Throwable t = new NoSuchLockException();
doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES,
+ LOCK_ID);
task.run();
- verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(SOURCES), t);
}
@Test
public void testHeartbeatFailsNoSuchTxnException() throws Exception {
Throwable t = new NoSuchTxnException();
doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES,
+ LOCK_ID);
task.run();
- verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(SOURCES), t);
}
@Test
public void testHeartbeatFailsTxnAbortedException() throws Exception {
Throwable t = new TxnAbortedException();
doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES,
+ LOCK_ID);
task.run();
- verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+ verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(SOURCES), t);
}
@Test
public void testHeartbeatContinuesTException() throws Exception {
Throwable t = new TException();
doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
- HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+ HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, SOURCES,
+ LOCK_ID);
task.run();
verifyZeroInteractions(mockListener);
}