You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:35 UTC
[29/51] [partial] incubator-tephra git commit: Rename package to
org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/distributed/ElasticPoolTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/distributed/ElasticPoolTest.java b/tephra-core/src/test/java/co/cask/tephra/distributed/ElasticPoolTest.java
deleted file mode 100644
index 40354cf..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/distributed/ElasticPoolTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import com.google.common.base.Throwables;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Tests for {@link ElasticPool}.
- */
-public class ElasticPoolTest {
-
- static class Dummy {
- static AtomicInteger count = new AtomicInteger(0);
- boolean valid = true;
- Dummy() {
- count.incrementAndGet();
- }
- void markInvalid() {
- valid = false;
- }
-
- public boolean isValid() {
- return valid;
- }
- }
-
- class DummyPool extends ElasticPool<Dummy, RuntimeException> {
-
- public DummyPool(int sizeLimit) {
- super(sizeLimit);
- }
-
- @Override
- protected Dummy create() {
- return new Dummy();
- }
-
- @Override
- protected boolean recycle(Dummy element) {
- return element.isValid();
- }
- }
-
- @Test(timeout = 5000)
- public void testFewerThreadsThanElements() throws InterruptedException {
- final DummyPool pool = new DummyPool(5);
- Dummy.count.set(0);
- createAndRunThreads(2, pool, false);
- // we only ran 2 threads, so only 2 elements got created, even though pool size is 5
- Assert.assertEquals(2, Dummy.count.get());
- }
-
- @Test(timeout = 5000)
- public void testMoreThreadsThanElements() throws InterruptedException {
- final DummyPool pool = new DummyPool(2);
- Dummy.count.set(0);
- createAndRunThreads(5, pool, false);
- // even though we ran 5 threads, only 2 elements got created because pool size is 2
- Assert.assertEquals(2, Dummy.count.get());
- }
-
- @Test(timeout = 5000)
- public void testMoreThreadsThanElementsWithDiscard() throws InterruptedException {
- final DummyPool pool = new DummyPool(2);
- Dummy.count.set(0);
- int numThreads = 3;
- // pass 'true' as the last parameter, which results in the elements being discarded after each obtain() call.
- createAndRunThreads(numThreads, pool, true);
- // this results in (5 * numThreads) number of elements being created since each thread obtains a client 5 times.
- Assert.assertEquals(5 * numThreads, Dummy.count.get());
- }
-
- // Creates a list of threads which obtain a client from the pool, sleeps for a certain amount of time, and then
- // releases the client back to the pool, optionally marking it invalid before doing so. It repeats this five times.
- // Then, runs these threads to completion.
- private void createAndRunThreads(int numThreads, final DummyPool pool,
- final boolean discardAtEnd) throws InterruptedException {
- Thread[] threads = new Thread[numThreads];
- for (int i = 0; i < numThreads; i++) {
- threads[i] = new Thread() {
- @Override
- public void run() {
- for (int j = 0; j < 5; ++j) {
- Dummy dummy;
- try {
- dummy = pool.obtain();
- } catch (InterruptedException e) {
- throw Throwables.propagate(e);
- }
- try {
- Thread.sleep(10L);
- } catch (InterruptedException e) {
- // ignored
- }
- if (discardAtEnd) {
- dummy.markInvalid();
- }
- pool.release(dummy);
- }
- }
- };
- }
- for (Thread t : threads) {
- t.start();
- }
- for (Thread t : threads) {
- t.join();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/distributed/PooledClientProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/distributed/PooledClientProviderTest.java b/tephra-core/src/test/java/co/cask/tephra/distributed/PooledClientProviderTest.java
deleted file mode 100644
index 354b5b7..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/distributed/PooledClientProviderTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TransactionServiceMain;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import com.google.common.base.Throwables;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
-
-public class PooledClientProviderTest {
-
- public static final int MAX_CLIENT_COUNT = 3;
- public static final long CLIENT_OBTAIN_TIMEOUT = 10;
-
- @ClassRule
- public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @Test
- public void testClientConnectionPoolMaximumNumberOfClients() throws Exception {
- // We need a server for the client to connect to
- InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
- zkServer.startAndWait();
-
- try {
- Configuration conf = new Configuration();
- conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
- conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
- conf.set("data.tx.client.count", Integer.toString(MAX_CLIENT_COUNT));
- conf.set("data.tx.client.obtain.timeout", Long.toString(CLIENT_OBTAIN_TIMEOUT));
-
- final TransactionServiceMain main = new TransactionServiceMain(conf);
- final CountDownLatch latch = new CountDownLatch(1);
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- main.start();
- latch.countDown();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
- };
-
- try {
- t.start();
- // Wait for service to startup
- latch.await();
-
- startClientAndTestPool(conf);
- } finally {
- main.stop();
- t.join();
- }
- } finally {
- zkServer.stopAndWait();
- }
- }
-
- private void startClientAndTestPool(Configuration conf) throws Exception {
- Injector injector = Guice.createInjector(
- new ConfigModule(conf),
- new ZKModule(),
- new DiscoveryModules().getDistributedModules(),
- new TransactionModules().getDistributedModules(),
- new TransactionClientModule()
- );
-
- ZKClientService zkClient = injector.getInstance(ZKClientService.class);
- zkClient.startAndWait();
-
- final PooledClientProvider clientProvider = new PooledClientProvider(conf,
- injector.getInstance(DiscoveryServiceClient.class));
-
- // test simple case of get + return. Note: this also initializes the provider's pool, which
- // takes about one second (discovery). Doing it before we test the threads makes it so that one
- // thread doesn't take exceptionally longer than the others.
- try (CloseableThriftClient closeableThriftClient = clientProvider.getCloseableClient()) {
- // do nothing with the client
- }
-
- //Now race to get MAX_CLIENT_COUNT+1 clients, exhausting the pool and requesting 1 more.
- List<Future<Integer>> clientIds = new ArrayList<Future<Integer>>();
- CountDownLatch countDownLatch = new CountDownLatch(1);
- ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENT_COUNT + 1);
- for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
- clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT / 2, countDownLatch)));
- }
- countDownLatch.countDown();
-
- Set<Integer> ids = new HashSet<Integer>();
- for (Future<Integer> id : clientIds) {
- ids.add(id.get());
- }
- Assert.assertEquals(MAX_CLIENT_COUNT, ids.size());
-
- // now, try it again with, where each thread holds onto the client for twice the client.obtain.timeout value.
- // one of the threads should throw a TimeOutException, because the other threads don't release their clients
- // within the configured timeout.
- countDownLatch = new CountDownLatch(1);
- for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
- clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT * 2, countDownLatch)));
- }
- countDownLatch.countDown();
- int numTimeoutExceptions = 0;
- for (Future<Integer> clientId : clientIds) {
- try {
- clientId.get();
- } catch (ExecutionException expected) {
- Assert.assertEquals(TimeoutException.class, expected.getCause().getClass());
- numTimeoutExceptions++;
- }
- }
- // expect that exactly one of the threads hit the TimeoutException
- Assert.assertEquals(String.format("Expected one thread to not obtain a client within %s milliseconds.",
- CLIENT_OBTAIN_TIMEOUT),
- 1, numTimeoutExceptions);
-
- executor.shutdown();
- }
-
- private static class RetrieveClient implements Callable<Integer> {
- private final PooledClientProvider pool;
- private final long holdClientMs;
- private final CountDownLatch begin;
-
- public RetrieveClient(PooledClientProvider pool, long holdClientMs,
- CountDownLatch begin) {
- this.pool = pool;
- this.holdClientMs = holdClientMs;
- this.begin = begin;
- }
-
- @Override
- public Integer call() throws Exception {
- begin.await();
- try (CloseableThriftClient client = pool.getCloseableClient()) {
- int id = System.identityHashCode(client.getThriftClient());
- // "use" the client for a configured amount of milliseconds
- Thread.sleep(holdClientMs);
- return id;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/distributed/ThriftTransactionServerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/co/cask/tephra/distributed/ThriftTransactionServerTest.java
deleted file mode 100644
index 83aad3c..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/distributed/ThriftTransactionServerTest.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.ThriftTransactionSystemTest;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import co.cask.tephra.persist.TransactionEdit;
-import co.cask.tephra.persist.TransactionLog;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Scopes;
-import com.google.inject.util.Modules;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This tests whether transaction service hangs on stop when heavily loaded - https://issues.cask.co/browse/TEPHRA-132
- */
-public class ThriftTransactionServerTest {
- private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
-
- private static InMemoryZKServer zkServer;
- private static ZKClientService zkClientService;
- private static TransactionService txService;
- private static TransactionStateStorage storage;
- static Injector injector;
-
- private static final int NUM_CLIENTS = 17;
- private static final CountDownLatch STORAGE_WAIT_LATCH = new CountDownLatch(1);
- private static final CountDownLatch CLIENTS_DONE_LATCH = new CountDownLatch(NUM_CLIENTS);
-
- @ClassRule
- public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @BeforeClass
- public static void start() throws Exception {
- zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
- zkServer.startAndWait();
-
- Configuration conf = new Configuration();
- conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
- conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
- conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
- conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
- conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT, NUM_CLIENTS);
- conf.setLong(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT, TimeUnit.HOURS.toMillis(1));
- conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS, 2);
- conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS, 4);
-
- injector = Guice.createInjector(
- new ConfigModule(conf),
- new ZKModule(),
- new DiscoveryModules().getDistributedModules(),
- Modules.override(new TransactionModules().getDistributedModules())
- .with(new AbstractModule() {
- @Override
- protected void configure() {
- bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON);
- }
- }),
- new TransactionClientModule()
- );
-
- zkClientService = injector.getInstance(ZKClientService.class);
- zkClientService.startAndWait();
-
- // start a tx server
- txService = injector.getInstance(TransactionService.class);
- storage = injector.getInstance(TransactionStateStorage.class);
- try {
- LOG.info("Starting transaction service");
- txService.startAndWait();
- } catch (Exception e) {
- LOG.error("Failed to start service: ", e);
- }
- }
-
- @Before
- public void reset() throws Exception {
- getClient().resetState();
- }
-
- @AfterClass
- public static void stop() throws Exception {
- txService.stopAndWait();
- storage.stopAndWait();
- zkClientService.stopAndWait();
- zkServer.stopAndWait();
- }
-
- public TransactionSystemClient getClient() throws Exception {
- return injector.getInstance(TransactionSystemClient.class);
- }
-
- @Test
- public void testThriftServerStop() throws Exception {
- int nThreads = NUM_CLIENTS;
- ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
- for (int i = 0; i < nThreads; ++i) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- TransactionSystemClient txClient = getClient();
- CLIENTS_DONE_LATCH.countDown();
- txClient.startShort();
- } catch (Exception e) {
- // Exception expected
- }
- }
- });
- }
-
- // Wait till all clients finish sending reqeust to transaction manager
- CLIENTS_DONE_LATCH.await();
- TimeUnit.SECONDS.sleep(1);
-
- // Expire zookeeper session, which causes Thrift server to stop.
- expireZkSession(zkClientService);
- waitForThriftTermination();
-
- // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift sever again.
- zkClientService.stopAndWait();
- STORAGE_WAIT_LATCH.countDown();
- TimeUnit.SECONDS.sleep(1);
-
- // Make sure Thrift server stopped.
- Assert.assertEquals(Service.State.TERMINATED, txService.thriftRPCServerState());
- }
-
- private void expireZkSession(ZKClientService zkClientService) throws Exception {
- ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get();
- final SettableFuture<?> connectFuture = SettableFuture.create();
- Watcher watcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (event.getState() == Event.KeeperState.SyncConnected) {
- connectFuture.set(null);
- }
- }
- };
-
- // Create another Zookeeper session with the same sessionId so that the original one expires.
- final ZooKeeper dupZookeeper =
- new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher,
- zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
- connectFuture.get(30, TimeUnit.SECONDS);
- Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED);
- dupZookeeper.close();
- }
-
- private void waitForThriftTermination() throws InterruptedException {
- int count = 0;
- while (txService.thriftRPCServerState() != Service.State.TERMINATED && count++ < 200) {
- TimeUnit.MILLISECONDS.sleep(50);
- }
- }
-
- private static class SlowTransactionStorage extends InMemoryTransactionStateStorage {
- @Override
- public TransactionLog createLog(long timestamp) throws IOException {
- return new SlowTransactionLog(timestamp);
- }
- }
-
- private static class SlowTransactionLog extends InMemoryTransactionStateStorage.InMemoryTransactionLog {
- public SlowTransactionLog(long timestamp) {
- super(timestamp);
- }
-
- @Override
- public void append(TransactionEdit edit) throws IOException {
- try {
- STORAGE_WAIT_LATCH.await();
- } catch (InterruptedException e) {
- LOG.error("Got exception: ", e);
- }
- super.append(edit);
- }
-
- @Override
- public void append(List<TransactionEdit> edits) throws IOException {
- try {
- STORAGE_WAIT_LATCH.await();
- } catch (InterruptedException e) {
- LOG.error("Got exception: ", e);
- }
- super.append(edits);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/hbase/AbstractTransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/hbase/AbstractTransactionVisibilityFilterTest.java b/tephra-core/src/test/java/co/cask/tephra/hbase/AbstractTransactionVisibilityFilterTest.java
deleted file mode 100644
index 6427b07..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/hbase/AbstractTransactionVisibilityFilterTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.hbase;
-
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.util.ConfigurationFactory;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
-import org.junit.Before;
-
-import java.util.List;
-
-/**
- * Common test class for TransactionVisibilityFilter implementations.
- */
-public abstract class AbstractTransactionVisibilityFilterTest {
-
- protected static final byte[] FAM = new byte[] {'f'};
- protected static final byte[] FAM2 = new byte[] {'f', '2'};
- protected static final byte[] FAM3 = new byte[] {'f', '3'};
- protected static final byte[] COL = new byte[] {'c'};
- protected static final List<byte[]> EMPTY_CHANGESET = Lists.newArrayListWithCapacity(0);
-
- protected TransactionManager txManager;
-
- @Before
- public void setup() throws Exception {
- Configuration conf = new ConfigurationFactory().get();
- conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
- txManager = new TransactionManager(conf);
- txManager.startAndWait();
- }
-
- @After
- public void tearDown() throws Exception {
- txManager.stopAndWait();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/AbstractTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/AbstractTransactionStateStorageTest.java
deleted file mode 100644
index b2bf69c..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/AbstractTransactionStateStorageTest.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.util.TransactionEditUtil;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Commons tests to run against the {@link TransactionStateStorage} implementations.
- */
-public abstract class AbstractTransactionStateStorageTest {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionStateStorageTest.class);
- private static Random random = new Random();
-
- protected abstract Configuration getConfiguration(String testName) throws IOException;
-
- protected abstract AbstractTransactionStateStorage getStorage(Configuration conf);
-
- @Test
- public void testSnapshotPersistence() throws Exception {
- Configuration conf = getConfiguration("testSnapshotPersistence");
-
- TransactionSnapshot snapshot = createRandomSnapshot();
- TransactionStateStorage storage = getStorage(conf);
- try {
- storage.startAndWait();
- storage.writeSnapshot(snapshot);
-
- TransactionSnapshot readSnapshot = storage.getLatestSnapshot();
- assertNotNull(readSnapshot);
- assertEquals(snapshot, readSnapshot);
- } finally {
- storage.stopAndWait();
- }
- }
-
- @Test
- public void testLogWriteAndRead() throws Exception {
- Configuration conf = getConfiguration("testLogWriteAndRead");
-
- // create some random entries
- List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(100);
- TransactionStateStorage storage = getStorage(conf);
- try {
- long now = System.currentTimeMillis();
- storage.startAndWait();
- TransactionLog log = storage.createLog(now);
- for (TransactionEdit edit : edits) {
- log.append(edit);
- }
- log.close();
-
- Collection<TransactionLog> logsToRead = storage.getLogsSince(now);
- // should only be our one log
- assertNotNull(logsToRead);
- assertEquals(1, logsToRead.size());
- TransactionLogReader logReader = logsToRead.iterator().next().getReader();
- assertNotNull(logReader);
-
- List<TransactionEdit> readEdits = Lists.newArrayListWithExpectedSize(edits.size());
- TransactionEdit nextEdit;
- while ((nextEdit = logReader.next()) != null) {
- readEdits.add(nextEdit);
- }
- logReader.close();
- assertEquals(edits.size(), readEdits.size());
- for (int i = 0; i < edits.size(); i++) {
- LOG.info("Checking edit " + i);
- assertEquals(edits.get(i), readEdits.get(i));
- }
- } finally {
- storage.stopAndWait();
- }
- }
-
- @Test
- public void testTransactionManagerPersistence() throws Exception {
- Configuration conf = getConfiguration("testTransactionManagerPersistence");
- conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
- // start snapshot thread, but with long enough interval so we only get snapshots on shutdown
- conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 600);
-
- TransactionStateStorage storage = null;
- TransactionStateStorage storage2 = null;
- TransactionStateStorage storage3 = null;
- try {
- storage = getStorage(conf);
- TransactionManager txManager = new TransactionManager
- (conf, storage, new TxMetricsCollector());
- txManager.startAndWait();
-
- // TODO: replace with new persistence tests
- final byte[] a = { 'a' };
- final byte[] b = { 'b' };
- // start a tx1, add a change A and commit
- Transaction tx1 = txManager.startShort();
- Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
- Assert.assertTrue(txManager.commit(tx1));
- // start a tx2 and add a change B
- Transaction tx2 = txManager.startShort();
- Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
- // start a tx3
- Transaction tx3 = txManager.startShort();
- // restart
- txManager.stopAndWait();
- TransactionSnapshot origState = txManager.getCurrentState();
- LOG.info("Orig state: " + origState);
-
- Thread.sleep(100);
- // starts a new tx manager
- storage2 = getStorage(conf);
- txManager = new TransactionManager(conf, storage2, new TxMetricsCollector());
- txManager.startAndWait();
-
- // check that the reloaded state matches the old
- TransactionSnapshot newState = txManager.getCurrentState();
- LOG.info("New state: " + newState);
- assertEquals(origState, newState);
-
- // commit tx2
- Assert.assertTrue(txManager.commit(tx2));
- // start another transaction, must be greater than tx3
- Transaction tx4 = txManager.startShort();
- Assert.assertTrue(tx4.getTransactionId() > tx3.getTransactionId());
- // tx1 must be visble from tx2, but tx3 and tx4 must not
- Assert.assertTrue(tx2.isVisible(tx1.getTransactionId()));
- Assert.assertFalse(tx2.isVisible(tx3.getTransactionId()));
- Assert.assertFalse(tx2.isVisible(tx4.getTransactionId()));
- // add same change for tx3
- Assert.assertFalse(txManager.canCommit(tx3, Collections.singleton(b)));
- // check visibility with new xaction
- Transaction tx5 = txManager.startShort();
- Assert.assertTrue(tx5.isVisible(tx1.getTransactionId()));
- Assert.assertTrue(tx5.isVisible(tx2.getTransactionId()));
- Assert.assertFalse(tx5.isVisible(tx3.getTransactionId()));
- Assert.assertFalse(tx5.isVisible(tx4.getTransactionId()));
- // can commit tx3?
- txManager.abort(tx3);
- txManager.abort(tx4);
- txManager.abort(tx5);
- // start new tx and verify its exclude list is empty
- Transaction tx6 = txManager.startShort();
- Assert.assertFalse(tx6.hasExcludes());
- txManager.abort(tx6);
-
- // now start 5 x claim size transactions
- Transaction tx = txManager.startShort();
- for (int i = 1; i < 50; i++) {
- tx = txManager.startShort();
- }
- origState = txManager.getCurrentState();
-
- Thread.sleep(100);
- // simulate crash by starting a new tx manager without a stopAndWait
- storage3 = getStorage(conf);
- txManager = new TransactionManager(conf, storage3, new TxMetricsCollector());
- txManager.startAndWait();
-
- // verify state again matches (this time should include WAL replay)
- newState = txManager.getCurrentState();
- assertEquals(origState, newState);
-
- // get a new transaction and verify it is greater
- Transaction txAfter = txManager.startShort();
- Assert.assertTrue(txAfter.getTransactionId() > tx.getTransactionId());
- } finally {
- if (storage != null) {
- storage.stopAndWait();
- }
- if (storage2 != null) {
- storage2.stopAndWait();
- }
- if (storage3 != null) {
- storage3.stopAndWait();
- }
- }
- }
-
- /**
- * Tests whether the committed set is advanced properly on WAL replay.
- */
- @Test
- public void testCommittedSetClearing() throws Exception {
- Configuration conf = getConfiguration("testCommittedSetClearing");
- conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
- conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 0); // no periodic snapshots
-
- TransactionStateStorage storage1 = null;
- TransactionStateStorage storage2 = null;
- try {
- storage1 = getStorage(conf);
- TransactionManager txManager = new TransactionManager
- (conf, storage1, new TxMetricsCollector());
- txManager.startAndWait();
-
- // TODO: replace with new persistence tests
- final byte[] a = { 'a' };
- final byte[] b = { 'b' };
- // start a tx1, add a change A and commit
- Transaction tx1 = txManager.startShort();
- Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
- Assert.assertTrue(txManager.commit(tx1));
- // start a tx2 and add a change B
- Transaction tx2 = txManager.startShort();
- Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
- // start a tx3
- Transaction tx3 = txManager.startShort();
- TransactionSnapshot origState = txManager.getCurrentState();
- LOG.info("Orig state: " + origState);
-
- // simulate a failure by starting a new tx manager without stopping first
- storage2 = getStorage(conf);
- txManager = new TransactionManager(conf, storage2, new TxMetricsCollector());
- txManager.startAndWait();
-
- // check that the reloaded state matches the old
- TransactionSnapshot newState = txManager.getCurrentState();
- LOG.info("New state: " + newState);
- assertEquals(origState, newState);
-
- } finally {
- if (storage1 != null) {
- storage1.stopAndWait();
- }
- if (storage2 != null) {
- storage2.stopAndWait();
- }
- }
- }
-
- /**
- * Tests removal of old snapshots and old transaction logs.
- */
- @Test
- public void testOldFileRemoval() throws Exception {
- Configuration conf = getConfiguration("testOldFileRemoval");
- TransactionStateStorage storage = null;
- try {
- storage = getStorage(conf);
- storage.startAndWait();
- long now = System.currentTimeMillis();
- long writePointer = 1;
- Collection<Long> invalid = Lists.newArrayList();
- NavigableMap<Long, TransactionManager.InProgressTx> inprogress = Maps.newTreeMap();
- Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
- Map<Long, Set<ChangeId>> committed = Maps.newHashMap();
- TransactionSnapshot snapshot = new TransactionSnapshot(now, 0, writePointer++, invalid,
- inprogress, committing, committed);
- TransactionEdit dummyEdit = TransactionEdit.createStarted(1, 0, Long.MAX_VALUE, TransactionType.SHORT);
-
- // write snapshot 1
- storage.writeSnapshot(snapshot);
- TransactionLog log = storage.createLog(now);
- log.append(dummyEdit);
- log.close();
-
- snapshot = new TransactionSnapshot(now + 1, 0, writePointer++, invalid, inprogress, committing, committed);
- // write snapshot 2
- storage.writeSnapshot(snapshot);
- log = storage.createLog(now + 1);
- log.append(dummyEdit);
- log.close();
-
- snapshot = new TransactionSnapshot(now + 2, 0, writePointer++, invalid, inprogress, committing, committed);
- // write snapshot 3
- storage.writeSnapshot(snapshot);
- log = storage.createLog(now + 2);
- log.append(dummyEdit);
- log.close();
-
- snapshot = new TransactionSnapshot(now + 3, 0, writePointer++, invalid, inprogress, committing, committed);
- // write snapshot 4
- storage.writeSnapshot(snapshot);
- log = storage.createLog(now + 3);
- log.append(dummyEdit);
- log.close();
-
- snapshot = new TransactionSnapshot(now + 4, 0, writePointer++, invalid, inprogress, committing, committed);
- // write snapshot 5
- storage.writeSnapshot(snapshot);
- log = storage.createLog(now + 4);
- log.append(dummyEdit);
- log.close();
-
- snapshot = new TransactionSnapshot(now + 5, 0, writePointer++, invalid, inprogress, committing, committed);
- // write snapshot 6
- storage.writeSnapshot(snapshot);
- log = storage.createLog(now + 5);
- log.append(dummyEdit);
- log.close();
-
- List<String> allSnapshots = storage.listSnapshots();
- LOG.info("All snapshots: " + allSnapshots);
- assertEquals(6, allSnapshots.size());
- List<String> allLogs = storage.listLogs();
- LOG.info("All logs: " + allLogs);
- assertEquals(6, allLogs.size());
-
- long oldestKept = storage.deleteOldSnapshots(3);
- assertEquals(now + 3, oldestKept);
- allSnapshots = storage.listSnapshots();
- LOG.info("All snapshots: " + allSnapshots);
- assertEquals(3, allSnapshots.size());
-
- storage.deleteLogsOlderThan(oldestKept);
- allLogs = storage.listLogs();
- LOG.info("All logs: " + allLogs);
- assertEquals(3, allLogs.size());
- } finally {
- if (storage != null) {
- storage.stopAndWait();
- }
- }
- }
-
- @Test
- public void testLongTxnEditReplay() throws Exception {
- Configuration conf = getConfiguration("testLongTxnEditReplay");
- TransactionStateStorage storage = null;
- try {
- storage = getStorage(conf);
- storage.startAndWait();
-
- // Create long running txns. Abort one of them, invalidate another, invalidate and abort the last.
- long time1 = System.currentTimeMillis();
- long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG);
- TransactionEdit edit2 = TransactionEdit.createAborted(wp1, TransactionType.LONG, null);
-
- long time2 = time1 + 100;
- long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 100000, TransactionType.LONG);
- TransactionEdit edit4 = TransactionEdit.createInvalid(wp2);
-
- long time3 = time1 + 200;
- long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG);
- TransactionEdit edit6 = TransactionEdit.createInvalid(wp3);
- TransactionEdit edit7 = TransactionEdit.createAborted(wp3, TransactionType.LONG, null);
-
- // write transaction edits
- TransactionLog log = storage.createLog(time1);
- log.append(edit1);
- log.append(edit2);
- log.append(edit3);
- log.append(edit4);
- log.append(edit5);
- log.append(edit6);
- log.append(edit7);
- log.close();
-
- // Start transaction manager
- TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
- txm.startAndWait();
- try {
- // Verify that all txns are in invalid list.
- TransactionSnapshot snapshot1 = txm.getCurrentState();
- Assert.assertEquals(ImmutableList.of(wp1, wp2, wp3), snapshot1.getInvalid());
- Assert.assertEquals(0, snapshot1.getInProgress().size());
- Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size());
- Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size());
- } finally {
- txm.stopAndWait();
- }
- } finally {
- if (storage != null) {
- storage.stopAndWait();
- }
- }
- }
-
- @Test
- public void testTruncateInvalidTxEditReplay() throws Exception {
- Configuration conf = getConfiguration("testTruncateInvalidTxEditReplay");
- TransactionStateStorage storage = null;
- try {
- storage = getStorage(conf);
- storage.startAndWait();
-
- // Create some txns, and invalidate all of them.
- long time1 = System.currentTimeMillis();
- long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG);
- TransactionEdit edit2 = TransactionEdit.createInvalid(wp1);
-
- long time2 = time1 + 100;
- long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 10000, TransactionType.SHORT);
- TransactionEdit edit4 = TransactionEdit.createInvalid(wp2);
-
- long time3 = time1 + 2000;
- long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG);
- TransactionEdit edit6 = TransactionEdit.createInvalid(wp3);
-
- long time4 = time1 + 2100;
- long wp4 = time4 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit7 = TransactionEdit.createStarted(wp4, wp4 - 10, time4 + 10000, TransactionType.SHORT);
- TransactionEdit edit8 = TransactionEdit.createInvalid(wp4);
-
- // remove wp1 and wp3 from invalid list
- TransactionEdit edit9 = TransactionEdit.createTruncateInvalidTx(ImmutableSet.of(wp1, wp3));
- // truncate invalid transactions before time3
- TransactionEdit edit10 = TransactionEdit.createTruncateInvalidTxBefore(time3);
-
- // write transaction edits
- TransactionLog log = storage.createLog(time1);
- log.append(edit1);
- log.append(edit2);
- log.append(edit3);
- log.append(edit4);
- log.append(edit5);
- log.append(edit6);
- log.append(edit7);
- log.append(edit8);
- log.append(edit9);
- log.append(edit10);
- log.close();
-
- // Start transaction manager
- TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
- txm.startAndWait();
- try {
- // Only wp4 should be in invalid list.
- TransactionSnapshot snapshot = txm.getCurrentState();
- Assert.assertEquals(ImmutableList.of(wp4), snapshot.getInvalid());
- Assert.assertEquals(0, snapshot.getInProgress().size());
- Assert.assertEquals(0, snapshot.getCommittedChangeSets().size());
- Assert.assertEquals(0, snapshot.getCommittedChangeSets().size());
- } finally {
- txm.stopAndWait();
- }
- } finally {
- if (storage != null) {
- storage.stopAndWait();
- }
- }
- }
-
- /**
- * Generates a new snapshot object with semi-randomly populated values. This does not necessarily accurately
- * represent a typical snapshot's distribution of values, as we only set an upper bound on pointer values.
- *
- * We generate a new snapshot with the contents:
- * <ul>
- * <li>readPointer = 1M + (random % 1M)</li>
- * <li>writePointer = readPointer + 1000</li>
- * <li>waterMark = writePointer + 1000</li>
- * <li>inProgress = one each for (writePointer - 500)..writePointer, ~ 5% "long" transaction</li>
- * <li>invalid = 100 randomly distributed, 0..1M</li>
- * <li>committing = one each, (readPointer + 1)..(readPointer + 100)</li>
- * <li>committed = one each, (readPointer - 1000)..readPointer</li>
- * </ul>
- * @return a new snapshot of transaction state.
- */
- private TransactionSnapshot createRandomSnapshot() {
- // limit readPointer to a reasonable range, but make it > 1M so we can assign enough keys below
- long readPointer = (Math.abs(random.nextLong()) % 1000000L) + 1000000L;
- long writePointer = readPointer + 1000L;
-
- // generate in progress -- assume last 500 write pointer values
- NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
- long startPointer = writePointer - 500L;
- for (int i = 0; i < 500; i++) {
- long currentTime = System.currentTimeMillis();
- // make some "long" transactions
- if (i % 20 == 0) {
- inProgress.put(startPointer + i,
- new TransactionManager.InProgressTx(startPointer - 1, currentTime + TimeUnit.DAYS.toSeconds(1),
- TransactionType.LONG));
- } else {
- inProgress.put(startPointer + i,
- new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L,
- TransactionType.SHORT));
- }
- }
-
- // make 100 random invalid IDs
- LongArrayList invalid = new LongArrayList();
- for (int i = 0; i < 100; i++) {
- invalid.add(Math.abs(random.nextLong()) % 1000000L);
- }
-
- // make 100 committing entries, 10 keys each
- Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
- for (int i = 0; i < 100; i++) {
- committing.put(readPointer + i, generateChangeSet(10));
- }
-
- // make 1000 committed entries, 10 keys each
- long startCommitted = readPointer - 1000L;
- NavigableMap<Long, Set<ChangeId>> committed = Maps.newTreeMap();
- for (int i = 0; i < 1000; i++) {
- committed.put(startCommitted + i, generateChangeSet(10));
- }
-
- return new TransactionSnapshot(System.currentTimeMillis(), readPointer, writePointer,
- invalid, inProgress, committing, committed);
- }
-
- private Set<ChangeId> generateChangeSet(int numEntries) {
- Set<ChangeId> changes = Sets.newHashSet();
- for (int i = 0; i < numEntries; i++) {
- byte[] bytes = new byte[8];
- random.nextBytes(bytes);
- changes.add(new ChangeId(bytes));
- }
- return changes;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/CommitMarkerCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/CommitMarkerCodecTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/CommitMarkerCodecTest.java
deleted file mode 100644
index da876fa..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/CommitMarkerCodecTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import com.google.common.primitives.Ints;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Unit Test for {@link CommitMarkerCodec}.
- */
-public class CommitMarkerCodecTest {
-
- @ClassRule
- public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
- private static final String LOG_FILE = "txlog";
- private static final Random RANDOM = new Random();
-
- private static MiniDFSCluster dfsCluster;
- private static Configuration conf;
- private static FileSystem fs;
-
- @BeforeClass
- public static void setupBeforeClass() throws Exception {
- Configuration hConf = new Configuration();
- hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath());
-
- dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
- conf = new Configuration(dfsCluster.getFileSystem().getConf());
- fs = FileSystem.newInstance(FileSystem.getDefaultUri(conf), conf);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- dfsCluster.shutdown();
- }
-
- @Test
- public void testRandomCommitMarkers() throws Exception {
- List<Integer> randomInts = new ArrayList<>();
- Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
-
- // Write a bunch of random commit markers
- try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
- LongWritable.class,
- SequenceFile.CompressionType.NONE)) {
- for (int i = 0; i < 1000; i++) {
- int randomNum = RANDOM.nextInt(Integer.MAX_VALUE);
- CommitMarkerCodec.writeMarker(writer, randomNum);
- randomInts.add(randomNum);
- }
- writer.hflush();
- writer.hsync();
- }
-
- // Read the commit markers back to verify the marker
- try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
- CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
- for (int num : randomInts) {
- Assert.assertEquals(num, markerCodec.readMarker(reader));
- }
- }
- }
-
- private static class IncompleteValueBytes implements SequenceFile.ValueBytes {
-
- @Override
- public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
- // don't write anything to simulate incomplete record
- }
-
- @Override
- public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
- throw new IllegalArgumentException("Not possible");
- }
-
- @Override
- public int getSize() {
- return Ints.BYTES;
- }
- }
-
- @Test
- public void testIncompleteCommitMarker() throws Exception {
- Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
- try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
- LongWritable.class,
- SequenceFile.CompressionType.NONE)) {
- String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
- SequenceFile.ValueBytes valueBytes = new IncompleteValueBytes();
- writer.appendRaw(key.getBytes(), 0, key.length(), valueBytes);
- writer.hflush();
- writer.hsync();
- }
-
- // Read the incomplete commit marker
- try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
- CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
- try {
- markerCodec.readMarker(reader);
- Assert.fail("Expected EOF Exception to be thrown");
- } catch (EOFException e) {
- // expected since we didn't write the value bytes
- }
- }
- }
-
- @Test
- public void testIncorrectCommitMarker() throws Exception {
- Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
-
- // Write an incorrect marker
- try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
- LongWritable.class,
- SequenceFile.CompressionType.NONE)) {
- String invalidKey = "IncorrectKey";
- SequenceFile.ValueBytes valueBytes = new CommitMarkerCodec.CommitEntriesCount(100);
- writer.appendRaw(invalidKey.getBytes(), 0, invalidKey.length(), valueBytes);
- writer.hflush();
- writer.hsync();
- }
-
- // Read the commit markers back to verify the marker
- try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
- CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
- try {
- markerCodec.readMarker(reader);
- Assert.fail("Expected an IOException to be thrown");
- } catch (IOException e) {
- // expected
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionLogTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionLogTest.java
deleted file mode 100644
index 96015d1..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionLogTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.util.TransactionEditUtil;
-import com.google.common.io.Closeables;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * Testing for complete and partial sycs of {@link TransactionEdit} to {@link HDFSTransactionLog}
- */
-public class HDFSTransactionLogTest {
- @ClassRule
- public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
- private static final String LOG_FILE_PREFIX = "txlog.";
-
- private static MiniDFSCluster dfsCluster;
- private static Configuration conf;
- private static MetricsCollector metricsCollector;
-
- @BeforeClass
- public static void setupBeforeClass() throws Exception {
- Configuration hConf = new Configuration();
- hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath());
-
- dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
- conf = new Configuration(dfsCluster.getFileSystem().getConf());
- metricsCollector = new TxMetricsCollector();
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- dfsCluster.shutdown();
- }
-
- private Configuration getConfiguration() throws IOException {
- // tests should use the current user for HDFS
- conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
- conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, TMP_FOLDER.newFolder().getAbsolutePath());
- return conf;
- }
-
- private HDFSTransactionLog getHDFSTransactionLog(Configuration conf,
- FileSystem fs, long timeInMillis) throws Exception {
- String snapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
- Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
- return new HDFSTransactionLog(fs, conf, newLog, timeInMillis, metricsCollector);
- }
-
- private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs,
- long timeInMillis, boolean withMarker) throws IOException {
- String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
- Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
- SequenceFile.Metadata metadata = new SequenceFile.Metadata();
- if (withMarker) {
- metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
- new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
- }
- return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
- TransactionEdit.class, SequenceFile.CompressionType.NONE, null, null, metadata);
- }
-
- private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception {
- String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
- CommitMarkerCodec.writeMarker(writer, size);
- }
-
- private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete)
- throws Exception {
- List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
- long timestamp = System.currentTimeMillis();
- Configuration configuration = getConfiguration();
- FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
- SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, withMarker);
- AtomicLong logSequence = new AtomicLong();
- HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
- AbstractTransactionLog.Entry entry;
-
- for (int i = 0; i < totalCount - batchSize; i += batchSize) {
- if (withMarker) {
- writeNumWrites(writer, batchSize);
- }
- for (int j = 0; j < batchSize; j++) {
- entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
- writer.append(entry.getKey(), entry.getEdit());
- }
- writer.syncFs();
- }
-
- if (withMarker) {
- writeNumWrites(writer, batchSize);
- }
-
- for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
- entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
- writer.append(entry.getKey(), entry.getEdit());
- }
-
- entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()),
- edits.get(totalCount - 1));
- if (isComplete) {
- writer.append(entry.getKey(), entry.getEdit());
- } else {
- byte[] bytes = Longs.toByteArray(entry.getKey().get());
- writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() {
- @Override
- public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
- byte[] test = new byte[]{0x2};
- outStream.write(test, 0, 1);
- }
-
- @Override
- public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
- // no-op
- }
-
- @Override
- public int getSize() {
- // mimic size longer than the actual byte array size written, so we would reach EOF
- return 12;
- }
- });
- }
- writer.syncFs();
- Closeables.closeQuietly(writer);
-
- // now let's try to read this log
- TransactionLogReader reader = transactionLog.getReader();
- int syncedEdits = 0;
- while (reader.next() != null) {
- // testing reading the transaction edits
- syncedEdits++;
- }
- if (isComplete) {
- Assert.assertEquals(totalCount, syncedEdits);
- } else {
- Assert.assertEquals(totalCount - batchSize, syncedEdits);
- }
- }
-
- @Test
- public void testTransactionLogNewVersion() throws Exception {
- // in-complete sync
- testTransactionLogSync(1000, 1, true, false);
- testTransactionLogSync(2000, 5, true, false);
-
- // complete sync
- testTransactionLogSync(1000, 1, true, true);
- testTransactionLogSync(2000, 5, true, true);
- }
-
- @Test
- public void testTransactionLogOldVersion() throws Exception {
- // in-complete sync
- testTransactionLogSync(1000, 1, false, false);
-
- // complete sync
- testTransactionLogSync(2000, 5, false, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
deleted file mode 100644
index 30ce455..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-
-
-/**
- * Tests persistence of transaction snapshots and write-ahead logs to HDFS storage, using the
- * {@link HDFSTransactionStateStorage} and {@link HDFSTransactionLog} implementations.
- */
-public class HDFSTransactionStateStorageTest extends AbstractTransactionStateStorageTest {
-
- @ClassRule
- public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
- private static MiniDFSCluster dfsCluster;
- private static Configuration conf;
-
- @BeforeClass
- public static void setupBeforeClass() throws Exception {
- Configuration hConf = new Configuration();
- hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
-
- dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
- conf = new Configuration(dfsCluster.getFileSystem().getConf());
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- dfsCluster.shutdown();
- }
-
- @Override
- protected Configuration getConfiguration(String testName) throws IOException {
- // tests should use the current user for HDFS
- conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
- conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
- return conf;
- }
-
- @Override
- protected AbstractTransactionStateStorage getStorage(Configuration conf) {
- return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/InMemoryTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/InMemoryTransactionStateStorage.java b/tephra-core/src/test/java/co/cask/tephra/persist/InMemoryTransactionStateStorage.java
deleted file mode 100644
index 1b69c7a..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/InMemoryTransactionStateStorage.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import javax.annotation.Nullable;
-
-/**
- * Stores the latest transaction snapshot and logs in memory.
- */
-public class InMemoryTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
- // only keeps the most recent snapshot in memory
- private TransactionSnapshot lastSnapshot;
-
- private NavigableMap<Long, TransactionLog> logs = new TreeMap<>();
-
- @Override
- protected void startUp() throws Exception {
- }
-
- @Override
- protected void shutDown() throws Exception {
- lastSnapshot = null;
- logs = new TreeMap<>();
- }
-
- @Override
- public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
- // no codecs in in-memory mode
- }
-
- @Override
- public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
- lastSnapshot = snapshot;
- }
-
- @Override
- public TransactionSnapshot getLatestSnapshot() throws IOException {
- return lastSnapshot;
- }
-
- @Override
- public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
- return lastSnapshot;
- }
-
- @Override
- public long deleteOldSnapshots(int numberToKeep) throws IOException {
- // always only keep the last snapshot
- return lastSnapshot.getTimestamp();
- }
-
- @Override
- public List<String> listSnapshots() throws IOException {
- List<String> snapshots = Lists.newArrayListWithCapacity(1);
- if (lastSnapshot != null) {
- snapshots.add(Long.toString(lastSnapshot.getTimestamp()));
- }
- return snapshots;
- }
-
- @Override
- public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
- return Lists.newArrayList(logs.tailMap(timestamp).values());
- }
-
- @Override
- public TransactionLog createLog(long timestamp) throws IOException {
- TransactionLog log = new InMemoryTransactionLog(timestamp);
- logs.put(timestamp, log);
- return log;
- }
-
- @Override
- public void deleteLogsOlderThan(long timestamp) throws IOException {
- Iterator<Map.Entry<Long, TransactionLog>> logIter = logs.entrySet().iterator();
- while (logIter.hasNext()) {
- Map.Entry<Long, TransactionLog> logEntry = logIter.next();
- if (logEntry.getKey() < timestamp) {
- logIter.remove();
- }
- }
- }
-
- @Override
- public void setupStorage() throws IOException {
- }
-
- @Override
- public List<String> listLogs() throws IOException {
- return Lists.transform(Lists.newArrayList(logs.keySet()), new Function<Long, String>() {
- @Nullable
- @Override
- public String apply(@Nullable Long input) {
- return input.toString();
- }
- });
- }
-
- @Override
- public String getLocation() {
- return "in-memory";
- }
-
- public static class InMemoryTransactionLog implements TransactionLog {
- private long timestamp;
- private List<TransactionEdit> edits = Lists.newArrayList();
- boolean isClosed = false;
- public InMemoryTransactionLog(long timestamp) {
- this.timestamp = timestamp;
- }
-
- @Override
- public String getName() {
- return "in-memory@" + timestamp;
- }
-
- @Override
- public long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public void append(TransactionEdit edit) throws IOException {
- if (isClosed) {
- throw new IOException("Log is closed");
- }
- edits.add(edit);
- }
-
- @Override
- public void append(List<TransactionEdit> edits) throws IOException {
- if (isClosed) {
- throw new IOException("Log is closed");
- }
- edits.addAll(edits);
- }
-
- @Override
- public void close() {
- isClosed = true;
- }
-
- @Override
- public TransactionLogReader getReader() throws IOException {
- return new InMemoryLogReader(edits.iterator());
- }
- }
-
- public static class InMemoryLogReader implements TransactionLogReader {
- private final Iterator<TransactionEdit> editIterator;
-
- public InMemoryLogReader(Iterator<TransactionEdit> editIterator) {
- this.editIterator = editIterator;
- }
-
- @Override
- public TransactionEdit next() throws IOException {
- if (editIterator.hasNext()) {
- return editIterator.next();
- }
- return null;
- }
-
- @Override
- public TransactionEdit next(TransactionEdit reuse) throws IOException {
- return next();
- }
-
- @Override
- public void close() throws IOException {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java
deleted file mode 100644
index e2886ae..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.snapshot.DefaultSnapshotCodec;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import co.cask.tephra.snapshot.SnapshotCodecV4;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Runs transaction persistence tests against the {@link LocalFileTransactionStateStorage} and
- * {@link LocalFileTransactionLog} implementations.
- */
-public class LocalTransactionStateStorageTest extends AbstractTransactionStateStorageTest {
- @ClassRule
- public static TemporaryFolder tmpDir = new TemporaryFolder();
-
- @Override
- protected Configuration getConfiguration(String testName) throws IOException {
- File testDir = tmpDir.newFolder(testName);
- Configuration conf = new Configuration();
- conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
- conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
- return conf;
- }
-
- @Override
- protected AbstractTransactionStateStorage getStorage(Configuration conf) {
- return new LocalFileTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
- }
-
- // v2 TransactionEdit
- @SuppressWarnings("deprecation")
- private class TransactionEditV2 extends TransactionEdit {
- public TransactionEditV2(long writePointer, long visibilityUpperBound, State state, long expirationDate,
- Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type) {
- super(writePointer, visibilityUpperBound, state, expirationDate, changes, commitPointer, canCommit, type,
- null, 0L, 0L, null);
- }
- @Override
- public void write(DataOutput out) throws IOException {
- TransactionEditCodecs.encode(this, out, new TransactionEditCodecs.TransactionEditCodecV2());
- }
- }
-
- // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying
- // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between
- // HDFS and Local Storage, having this only over here is fine.
- @SuppressWarnings("deprecation")
- @Test
- public void testLongTxnBackwardsCompatibility() throws Exception {
- Configuration conf = getConfiguration("testLongTxnBackwardsCompatibility");
-
- // Use SnapshotCodec version 1
- String latestSnapshotCodec = conf.get(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
- conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
-
- TransactionStateStorage storage = null;
- try {
- storage = getStorage(conf);
- storage.startAndWait();
-
- // Create transaction snapshot and transaction edits with version when long running txns had -1 expiration.
- Collection<Long> invalid = Lists.newArrayList();
- NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
- long time1 = System.currentTimeMillis();
- long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
- inProgress.put(wp1, new TransactionManager.InProgressTx(wp1 - 5, -1L));
- long time2 = time1 + 100;
- long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
- inProgress.put(wp2, new TransactionManager.InProgressTx(wp2 - 50, time2 + 1000));
- Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
- Map<Long, Set<ChangeId>> committed = Maps.newHashMap();
- TransactionSnapshot snapshot = new TransactionSnapshot(time2, 0, wp2, invalid,
- inProgress, committing, committed);
- long time3 = time1 + 200;
- long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit1 = new TransactionEditV2(wp3, wp3 - 10, TransactionEdit.State.INPROGRESS, -1L,
- null, 0L, false, null);
- long time4 = time1 + 300;
- long wp4 = time4 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit2 = new TransactionEditV2(wp4, wp4 - 10, TransactionEdit.State.INPROGRESS, time4 + 1000,
- null, 0L, false, null);
-
- // write snapshot and transaction edit
- storage.writeSnapshot(snapshot);
- TransactionLog log = storage.createLog(time2);
- log.append(edit1);
- log.append(edit2);
- log.close();
-
- // Start transaction manager
- conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, latestSnapshotCodec);
- long longTimeout = TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.Manager.CFG_TX_LONG_TIMEOUT,
- TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT));
- TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
- txm.startAndWait();
- try {
- // Verify that the txns in old format were read correctly.
- // There should be four in-progress transactions, and no invalid transactions
- TransactionSnapshot snapshot1 = txm.getCurrentState();
- Assert.assertEquals(ImmutableSortedSet.of(wp1, wp2, wp3, wp4), snapshot1.getInProgress().keySet());
- verifyInProgress(snapshot1.getInProgress().get(wp1), TransactionType.LONG, time1 + longTimeout);
- verifyInProgress(snapshot1.getInProgress().get(wp2), TransactionType.SHORT, time2 + 1000);
- verifyInProgress(snapshot1.getInProgress().get(wp3), TransactionType.LONG, time3 + longTimeout);
- verifyInProgress(snapshot1.getInProgress().get(wp4), TransactionType.SHORT, time4 + 1000);
- Assert.assertEquals(0, snapshot1.getInvalid().size());
- } finally {
- txm.stopAndWait();
- }
- } finally {
- if (storage != null) {
- storage.stopAndWait();
- }
- }
- }
-
- // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying
- // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between
- // HDFS and Local Storage, having this only over here is fine.
- @SuppressWarnings("deprecation")
- @Test
- public void testAbortEditBackwardsCompatibility() throws Exception {
- Configuration conf = getConfiguration("testAbortEditBackwardsCompatibility");
-
- TransactionStateStorage storage = null;
- try {
- storage = getStorage(conf);
- storage.startAndWait();
-
- // Create edits for transaction type addition to abort
- long time1 = System.currentTimeMillis();
- long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit1 = new TransactionEditV2(wp1, wp1 - 10, TransactionEdit.State.INPROGRESS, -1L,
- null, 0L, false, null);
- TransactionEdit edit2 = new TransactionEditV2(wp1, 0L, TransactionEdit.State.ABORTED, 0L,
- null, 0L, false, null);
-
- long time2 = time1 + 400;
- long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
- TransactionEdit edit3 = new TransactionEditV2(wp2, wp2 - 10, TransactionEdit.State.INPROGRESS, time2 + 10000,
- null, 0L, false, null);
- TransactionEdit edit4 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.INVALID, 0L, null, 0L, false, null);
- // Simulate case where we cannot determine txn state during abort
- TransactionEdit edit5 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.ABORTED, 0L, null, 0L, false, null);
-
- // write snapshot and transaction edit
- TransactionLog log = storage.createLog(time1);
- log.append(edit1);
- log.append(edit2);
- log.append(edit3);
- log.append(edit4);
- log.append(edit5);
- log.close();
-
- // Start transaction manager
- TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
- txm.startAndWait();
- try {
- // Verify that the txns in old format were read correctly.
- // Both transactions should be in invalid state
- TransactionSnapshot snapshot1 = txm.getCurrentState();
- Assert.assertEquals(ImmutableList.of(wp1, wp2), snapshot1.getInvalid());
- Assert.assertEquals(0, snapshot1.getInProgress().size());
- Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size());
- Assert.assertEquals(0, snapshot1.getCommittingChangeSets().size());
- } finally {
- txm.stopAndWait();
- }
- } finally {
- if (storage != null) {
- storage.stopAndWait();
- }
- }
- }
-
- private void verifyInProgress(TransactionManager.InProgressTx inProgressTx, TransactionType type,
- long expiration) throws Exception {
- Assert.assertEquals(type, inProgressTx.getType());
- Assert.assertTrue(inProgressTx.getExpiration() == expiration);
- }
-}