You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:40 UTC
[34/51] [partial] incubator-tephra git commit: Rename package to
org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java
new file mode 100644
index 0000000..8abfc3b
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.tephra.distributed.thrift;
+
+
+public enum TTransactionType implements org.apache.thrift.TEnum {
+ SHORT(1),
+ LONG(2);
+
+ private final int value;
+
+ private TTransactionType(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static TTransactionType findByValue(int value) {
+ switch (value) {
+ case 1:
+ return SHORT;
+ case 2:
+ return LONG;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java
new file mode 100644
index 0000000..ed081b5
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.tephra.distributed.thrift;
+
+
+public enum TVisibilityLevel implements org.apache.thrift.TEnum {
+ SNAPSHOT(1),
+ SNAPSHOT_EXCLUDE_CURRENT(2),
+ SNAPSHOT_ALL(3);
+
+ private final int value;
+
+ private TVisibilityLevel(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static TVisibilityLevel findByValue(int value) {
+ switch (value) {
+ case 1:
+ return SNAPSHOT;
+ case 2:
+ return SNAPSHOT_EXCLUDE_CURRENT;
+ case 3:
+ return SNAPSHOT_ALL;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
new file mode 100644
index 0000000..c8bf22a
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.inmemory;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.InvalidTruncateTimeException;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of the tx system client that doesn't talk to any global service and tries to do its best to meet the
+ * tx system requirements/expectations. In fact it implements enough logic to support running flows (when each flowlet
+ * uses its own detached tx system client, without talking to each other and sharing any state) with "process exactly
+ * once" guarantee if no failures happen.
+ *
+ * NOTE: Will NOT detect conflicts. May leave inconsistent state when process crashes. Does NOT provide even read
+ * isolation guarantees.
+ *
+ * Good for performance testing. For demoing high throughput. For use-cases with relaxed tx guarantees.
+ */
+public class DetachedTxSystemClient implements TransactionSystemClient {
+ // Dataset and queue logic relies on tx id to grow monotonically even after restart. Hence we need to start with
+ // value that is for sure bigger than the last one used before restart.
+ // NOTE: with code below we assume we don't do more than InMemoryTransactionManager.MAX_TX_PER_MS tx/ms
+ // by single client
+ private AtomicLong generator = new AtomicLong(System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS);
+
+ @Override
+ public Transaction startShort() {
+ long wp = getWritePointer();
+ // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets
+ return new Transaction(
+ Long.MAX_VALUE - 1, wp, new long[0], new long[0],
+ Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT);
+ }
+
+ private long getWritePointer() {
+ long wp = generator.incrementAndGet();
+ // NOTE: using InMemoryTransactionManager.MAX_TX_PER_MS to be at least close to real one
+ long now = System.currentTimeMillis();
+ if (wp < now * TxConstants.MAX_TX_PER_MS) {
+ // trying to advance to align with timestamp, but only once: if failed, we'll just try again later with next tx
+ long advanced = now * TxConstants.MAX_TX_PER_MS;
+ if (generator.compareAndSet(wp, advanced)) {
+ wp = advanced;
+ }
+ }
+ return wp;
+ }
+
+ @Override
+ public Transaction startShort(int timeout) {
+ return startShort();
+ }
+
+ @Override
+ public Transaction startLong() {
+ return startShort();
+ }
+
+ @Override
+ public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) {
+ return true;
+ }
+
+ @Override
+ public boolean commit(Transaction tx) {
+ return true;
+ }
+
+ @Override
+ public void abort(Transaction tx) {
+ // do nothing
+ }
+
+ @Override
+ public boolean invalidate(long tx) {
+ return true;
+ }
+
+ @Override
+ public Transaction checkpoint(Transaction tx) {
+ long newWritePointer = getWritePointer();
+ LongArrayList newCheckpointPointers = new LongArrayList(tx.getCheckpointWritePointers());
+ newCheckpointPointers.add(newWritePointer);
+ return new Transaction(tx, newWritePointer, newCheckpointPointers.toLongArray());
+ }
+
+ @Override
+ public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
+ throw new TransactionCouldNotTakeSnapshotException(
+ "Snapshot not implemented in detached transaction system client");
+ }
+
+ @Override
+ public String status() {
+ return TxConstants.STATUS_OK;
+ }
+
+ @Override
+ public void resetState() {
+ // do nothing
+ }
+
+ @Override
+ public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
+ return true;
+ }
+
+ @Override
+ public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
+ return true;
+ }
+
+ @Override
+ public int getInvalidSize() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
new file mode 100644
index 0000000..823f934
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.inmemory;
+
+import com.google.common.util.concurrent.AbstractService;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.DiscoveryService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Transaction server that manages transaction data for the Reactor.
+ * <p>
+ * Transaction server is HA, one can start multiple instances, only one of which is active and will register itself in
+ * discovery service.
+ * </p>
+ */
+public class InMemoryTransactionService extends AbstractService {
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryTransactionService.class);
+
+ private final DiscoveryService discoveryService;
+ private final String serviceName;
+ protected final Provider<TransactionManager> txManagerProvider;
+ private Cancellable cancelDiscovery;
+ protected TransactionManager txManager;
+
+ // thrift server config
+ protected final String address;
+ protected final int port;
+ protected final int threads;
+ protected final int ioThreads;
+ protected final int maxReadBufferBytes;
+
+ @Inject
+ public InMemoryTransactionService(Configuration conf,
+ DiscoveryService discoveryService,
+ Provider<TransactionManager> txManagerProvider) {
+
+ this.discoveryService = discoveryService;
+ this.txManagerProvider = txManagerProvider;
+ this.serviceName = conf.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME,
+ TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME);
+
+ address = conf.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS, TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS);
+ port = conf.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT);
+
+ // Retrieve the number of threads for the service
+ threads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS,
+ TxConstants.Service.DEFAULT_DATA_TX_SERVER_THREADS);
+ ioThreads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS,
+ TxConstants.Service.DEFAULT_DATA_TX_SERVER_IO_THREADS);
+
+ maxReadBufferBytes = conf.getInt(TxConstants.Service.CFG_DATA_TX_THRIFT_MAX_READ_BUFFER,
+ TxConstants.Service.DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER);
+
+ LOG.info("Configuring TransactionService" +
+ ", address: " + address +
+ ", port: " + port +
+ ", threads: " + threads +
+ ", io threads: " + ioThreads +
+ ", max read buffer (bytes): " + maxReadBufferBytes);
+ }
+
+ protected void undoRegister() {
+ if (cancelDiscovery != null) {
+ cancelDiscovery.cancel();
+ }
+ }
+
+ protected void doRegister() {
+ cancelDiscovery = discoveryService.register(new Discoverable() {
+ @Override
+ public String getName() {
+ return serviceName;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return getAddress();
+ }
+ });
+ }
+
+ protected InetSocketAddress getAddress() {
+ return new InetSocketAddress(1);
+ }
+
+ @Override
+ protected void doStart() {
+ try {
+ txManager = txManagerProvider.get();
+ txManager.startAndWait();
+ doRegister();
+ LOG.info("Transaction Thrift service started successfully on " + getAddress());
+ notifyStarted();
+ } catch (Throwable t) {
+ LOG.info("Transaction Thrift service didn't start on " + getAddress());
+ notifyFailed(t);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ undoRegister();
+ txManager.stopAndWait();
+ notifyStopped();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
new file mode 100644
index 0000000..da38dd2
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.inmemory;
+
+import com.google.inject.Inject;
+import org.apache.tephra.InvalidTruncateTimeException;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ *
+ */
+public class InMemoryTxSystemClient implements TransactionSystemClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryTxSystemClient.class);
+
+ TransactionManager txManager;
+
+ @Inject
+ public InMemoryTxSystemClient(TransactionManager txmgr) {
+ txManager = txmgr;
+ }
+
+ @Override
+ public Transaction startLong() {
+ return txManager.startLong();
+ }
+
+ @Override
+ public Transaction startShort() {
+ return txManager.startShort();
+ }
+
+ @Override
+ public Transaction startShort(int timeout) {
+ return txManager.startShort(timeout);
+ }
+
+ @Override
+ public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
+ return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
+ }
+
+ @Override
+ public boolean commit(Transaction tx) throws TransactionNotInProgressException {
+ return txManager.commit(tx);
+ }
+
+ @Override
+ public void abort(Transaction tx) {
+ txManager.abort(tx);
+ }
+
+ @Override
+ public boolean invalidate(long tx) {
+ return txManager.invalidate(tx);
+ }
+
+ @Override
+ public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException {
+ return txManager.checkpoint(tx);
+ }
+
+ @Override
+ public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ boolean snapshotTaken = txManager.takeSnapshot(out);
+ if (!snapshotTaken) {
+ throw new TransactionCouldNotTakeSnapshotException("Transaction manager did not take a snapshot.");
+ }
+ } finally {
+ out.close();
+ }
+ return new ByteArrayInputStream(out.toByteArray());
+ } catch (IOException e) {
+ LOG.error("Snapshot could not be taken", e);
+ throw new TransactionCouldNotTakeSnapshotException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String status() {
+ return txManager.isRunning() ? TxConstants.STATUS_OK : TxConstants.STATUS_NOTOK;
+ }
+
+ @Override
+ public void resetState() {
+ txManager.resetState();
+ }
+
+ @Override
+ public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
+ return txManager.truncateInvalidTx(invalidTxIds);
+ }
+
+ @Override
+ public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
+ return txManager.truncateInvalidTxBefore(time);
+ }
+
+ @Override
+ public int getInvalidSize() {
+ return txManager.getInvalidSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
new file mode 100644
index 0000000..2f60225
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.inmemory;
+
+import org.apache.tephra.InvalidTruncateTimeException;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * Dummy implementation of TxSystemClient. May be useful for perf testing.
+ */
+public class MinimalTxSystemClient implements TransactionSystemClient {
+ private long currentTxPointer = 1;
+
+ @Override
+ public Transaction startShort() {
+ long wp = currentTxPointer++;
+ // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets
+ return new Transaction(
+ Long.MAX_VALUE - 1, wp, new long[0], new long[0],
+ Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT);
+ }
+
+ @Override
+ public Transaction startShort(int timeout) {
+ return startShort();
+ }
+
+ @Override
+ public Transaction startLong() {
+ return startShort();
+ }
+
+ @Override
+ public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) {
+ return true;
+ }
+
+ @Override
+ public boolean commit(Transaction tx) {
+ return true;
+ }
+
+ @Override
+ public void abort(Transaction tx) {
+ // do nothing
+ }
+
+ @Override
+ public boolean invalidate(long tx) {
+ return true;
+ }
+
+ @Override
+ public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException {
+ return tx;
+ }
+
+ @Override
+ public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
+ throw new TransactionCouldNotTakeSnapshotException("Not snapshot to take.");
+ }
+
+ @Override
+ public String status() {
+ return TxConstants.STATUS_OK;
+ }
+
+ @Override
+ public void resetState() {
+ // do nothing
+ }
+
+ @Override
+ public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
+ return true;
+ }
+
+ @Override
+ public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
+ return true;
+ }
+
+ @Override
+ public int getInvalidSize() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java
new file mode 100644
index 0000000..2e396fe
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains in memory implementation of the transaction system v2.
+ */
+package org.apache.tephra.inmemory;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java b/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java
new file mode 100644
index 0000000..86d2afa
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Slf4jReporter;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Default metrics collector implementation using <a href="http://metrics.dropwizard.io">Yammer Metrics</a>.
+ *
+ * <p>The reporting frequency for this collector can be configured by setting the
+ * {@code data.tx.metrics.period} configuration property to the reporting frequency in seconds.
+ * </p>
+ */
+public class DefaultMetricsCollector extends TxMetricsCollector {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricsCollector.class);
+
+ private final MetricRegistry metrics = new MetricRegistry();
+ private JmxReporter jmxReporter;
+ private ScheduledReporter reporter;
+ private int reportPeriod;
+ private ConcurrentMap<String, AtomicLong> gauges = Maps.newConcurrentMap();
+
+ @Override
+ public void configure(Configuration conf) {
+ // initialize selected output reporter
+ reportPeriod = conf.getInt(TxConstants.Metrics.REPORT_PERIOD_KEY, TxConstants.Metrics.REPORT_PERIOD_DEFAULT);
+ LOG.info("Configured metrics report to emit every {} seconds", reportPeriod);
+ // TODO: reporters should be pluggable based on injection
+ jmxReporter = JmxReporter.forRegistry(metrics).build();
+ reporter = Slf4jReporter.forRegistry(metrics)
+ .outputTo(LoggerFactory.getLogger("tephra-metrics"))
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+
+ @Override
+ public void gauge(String metricName, int value, String... tags) {
+ AtomicLong gauge = gauges.get(metricName);
+ if (gauge == null) {
+ final AtomicLong newValue = new AtomicLong();
+ if (gauges.putIfAbsent(metricName, newValue) == null) {
+ // first to set the value, need to register the metric
+ metrics.register(metricName, new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return newValue.get();
+ }
+ });
+ gauge = newValue;
+ } else {
+ // someone else set it first
+ gauge = gauges.get(metricName);
+ }
+ }
+ gauge.set(value);
+ }
+
+ @Override
+ public void histogram(String metricName, int value) {
+ metrics.histogram(metricName).update(value);
+ }
+
+ @Override
+ public void rate(String metricName) {
+ metrics.meter(metricName).mark();
+ }
+
+ @Override
+ public void rate(String metricName, int count) {
+ metrics.meter(metricName).mark(count);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ jmxReporter.start();
+ reporter.start(reportPeriod, TimeUnit.SECONDS);
+ LOG.info("Started metrics reporter");
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ jmxReporter.stop();
+ reporter.stop();
+ LOG.info("Stopped metrics reporter");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java b/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java
new file mode 100644
index 0000000..45668e1
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.metrics;
+
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Basic API for Tephra to support system metrics.
+ */
+public interface MetricsCollector extends Service {
+ /**
+ * Report a metric as an absolute value.
+ */
+ void gauge(String metricName, int value, String... tags);
+
+ /**
+ * Report a metric as a count over a given time duration. This method uses an implicit count of 1.
+ */
+ void rate(String metricName);
+
+ /**
+ * Report a metric as a count over a given time duration.
+ */
+ void rate(String metricName, int count);
+
+ /**
+ * Report a metric calculating the distribution of the value.
+ */
+ void histogram(String metricName, int value);
+
+ /**
+ * Called before the collector service is started, allowing the collector to setup any
+ * required configuration.
+ */
+ void configure(Configuration conf);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java b/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java
new file mode 100644
index 0000000..4dc7e2f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.metrics;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Metrics Collector Class, to emit Transaction Related Metrics.
+ * Note: This default implementation is a no-op and doesn't emit any metrics
+ */
+public class TxMetricsCollector extends AbstractIdleService implements MetricsCollector {
+
+ @Override
+ public void gauge(String metricName, int value, String... tags) {
+ //no-op
+ }
+
+ @Override
+ public void rate(String metricName) {
+ // no-op
+ }
+
+ @Override
+ public void rate(String metricName, int count) {
+ // no-op
+ }
+
+ @Override
+ public void histogram(String metricName, int value) {
+ // no-op
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ // no-op
+ }
+
+ /* Service methods */
+
+ @Override
+ protected void startUp() throws Exception {
+ // no-op
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/package-info.java b/tephra-core/src/main/java/org/apache/tephra/package-info.java
new file mode 100644
index 0000000..dea84ac
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains implementations of the transaction system v2.
+ */
+package org.apache.tephra;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
new file mode 100644
index 0000000..b1e0978
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Common implementation of a transaction log, backed by file reader and writer based storage. Classes extending
+ * this class, must also implement {@link TransactionLogWriter} and {@link TransactionLogReader}.
+ */
+public abstract class AbstractTransactionLog implements TransactionLog {
+ /** Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". */
+ private static final long SLOW_APPEND_THRESHOLD = 1000L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
+
+ private final AtomicLong logSequence = new AtomicLong();
+ private final MetricsCollector metricsCollector;
+ protected long timestamp;
+ private volatile boolean initialized;
+ private volatile boolean closed;
+ private AtomicLong syncedUpTo = new AtomicLong();
+ private List<Entry> pendingWrites = Lists.newLinkedList();
+ private TransactionLogWriter writer;
+
+ public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
+ this.timestamp = timestamp;
+ this.metricsCollector = metricsCollector;
+ }
+
+ /**
+ * Initializes the log file, opening a file writer. Clients calling {@code init()} should ensure that they
+ * also call {@link HDFSTransactionLog#close()}.
+ * @throws java.io.IOException If an error is encountered initializing the file writer.
+ */
+ public synchronized void init() throws IOException {
+ if (initialized) {
+ return;
+ }
+ this.writer = createWriter();
+ this.initialized = true;
+ }
+
+ /**
+ * Returns a log writer to be used for appending any new {@link TransactionEdit} objects.
+ */
+ protected abstract TransactionLogWriter createWriter() throws IOException;
+
+ @Override
+ public abstract String getName();
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public void append(TransactionEdit edit) throws IOException {
+ long startTime = System.nanoTime();
+ synchronized (this) {
+ ensureAvailable();
+
+ Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
+
+ // add to pending edits
+ append(entry);
+ }
+
+ // wait for sync to complete
+ sync();
+ long durationMillis = (System.nanoTime() - startTime) / 1000000L;
+ if (durationMillis > SLOW_APPEND_THRESHOLD) {
+ LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
+ }
+ }
+
+ @Override
+ public void append(List<TransactionEdit> edits) throws IOException {
+ long startTime = System.nanoTime();
+ synchronized (this) {
+ ensureAvailable();
+
+ for (TransactionEdit edit : edits) {
+ Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
+
+ // add to pending edits
+ append(entry);
+ }
+ }
+
+ // wait for sync to complete
+ sync();
+ long durationMillis = (System.nanoTime() - startTime) / 1000000L;
+ if (durationMillis > SLOW_APPEND_THRESHOLD) {
+ LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
+ }
+ }
+
+ private void ensureAvailable() throws IOException {
+ if (closed) {
+ throw new IOException("Log " + getName() + " is already closed, cannot append!");
+ }
+ if (!initialized) {
+ init();
+ }
+ }
+
+ /*
+ * Appends new writes to the pendingWrites. It is better to keep it in
+ * our own queue rather than writing it to the HDFS output stream because
+ * HDFSOutputStream.writeChunk is not lightweight at all.
+ */
+ private void append(Entry e) throws IOException {
+ pendingWrites.add(e);
+ }
+
+ // Returns all currently pending writes. New writes
+ // will accumulate in a new list.
+ private List<Entry> getPendingWrites() {
+ synchronized (this) {
+ List<Entry> save = this.pendingWrites;
+ this.pendingWrites = new LinkedList<>();
+ return save;
+ }
+ }
+
+ private void sync() throws IOException {
+ // writes out pending entries to the HLog
+ TransactionLogWriter tmpWriter = null;
+ long latestSeq = 0;
+ int entryCount = 0;
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ // prevent writer being dereferenced
+ tmpWriter = writer;
+
+ List<Entry> currentPending = getPendingWrites();
+ if (!currentPending.isEmpty()) {
+ tmpWriter.commitMarker(currentPending.size());
+ }
+
+ // write out all accumulated entries to log.
+ for (Entry e : currentPending) {
+ tmpWriter.append(e);
+ entryCount++;
+ latestSeq = Math.max(latestSeq, e.getKey().get());
+ }
+ }
+
+ long lastSynced = syncedUpTo.get();
+ // someone else might have already synced our edits, avoid double syncing
+ if (lastSynced < latestSeq) {
+ tmpWriter.sync();
+ metricsCollector.histogram("wal.sync.size", entryCount);
+ syncedUpTo.compareAndSet(lastSynced, latestSeq);
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ // perform a final sync if any outstanding writes
+ if (!pendingWrites.isEmpty()) {
+ sync();
+ }
+ // NOTE: writer is lazy-inited, so it can be null
+ if (writer != null) {
+ this.writer.close();
+ }
+ this.closed = true;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public abstract TransactionLogReader getReader() throws IOException;
+
+ /**
+ * Represents an entry in the transaction log. Each entry consists of a key, generated from an incrementing sequence
+ * number, and a value, the {@link TransactionEdit} being stored.
+ */
+ public static class Entry implements Writable {
+ private LongWritable key;
+ private TransactionEdit edit;
+
+ // for Writable
+ public Entry() {
+ this.key = new LongWritable();
+ this.edit = new TransactionEdit();
+ }
+
+ public Entry(LongWritable key, TransactionEdit edit) {
+ this.key = key;
+ this.edit = edit;
+ }
+
+ public LongWritable getKey() {
+ return this.key;
+ }
+
+ public TransactionEdit getEdit() {
+ return this.edit;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ this.key.write(out);
+ this.edit.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.key.readFields(in);
+ this.edit.readFields(in);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java
new file mode 100644
index 0000000..1c51ccd
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Common base class for all transaction storage implementations. This implement logic to prefix a snapshot
+ * with a version when encoding, and to select the correct codec for decoding based on this version prefix.
+ */
+public abstract class AbstractTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
+
+ protected final SnapshotCodecProvider codecProvider;
+
+ protected AbstractTransactionStateStorage(SnapshotCodecProvider codecProvider) {
+ this.codecProvider = codecProvider;
+ }
+
+ @Override
+ public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
+ codecProvider.encode(out, snapshot);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java b/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java
new file mode 100644
index 0000000..c4f02e5
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.tephra.TxConstants;
+
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Class to read and write commit markers used in {@link HDFSTransactionLogReaderV2} and above.
+ */
+public class CommitMarkerCodec implements Closeable {
+ private static final byte[] KEY_BYTES = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED.getBytes(Charsets.UTF_8);
+ private final DataOutputBuffer rawKey;
+ private final DataOutputBuffer rawValue;
+ private SequenceFile.ValueBytes valueBytes;
+
+ public CommitMarkerCodec() {
+ this.rawKey = new DataOutputBuffer();
+ this.rawValue = new DataOutputBuffer();
+ }
+
+ @Override
+ public void close() throws IOException {
+ rawKey.close();
+ rawValue.close();
+ }
+
+ // 1. Returns the count when the marker is written correctly
+ // 2. If data is incorrect (for ex, incorrect key, mismatch in key/value/record length), we throw IOException
+ // since this indicates corrupted log file
+ // 3. If data is incomplete, then we throw EOFException which is handled gracefully by the calling method
+ // since we can recover without any consequence
+ public int readMarker(SequenceFile.Reader reader) throws IOException {
+ if (valueBytes == null) {
+ valueBytes = reader.createValueBytes();
+ }
+ rawKey.reset();
+ rawValue.reset();
+
+ // valueBytes need not be reset since nextRaw call does it (and it is a private method)
+ int status = reader.nextRaw(rawKey, valueBytes);
+
+ // if we reach EOF, return -1
+ if (status == -1) {
+ return -1;
+ }
+
+ // Check if the marker key is valid and return the count
+ if (isMarkerValid()) {
+ valueBytes.writeUncompressedBytes(rawValue);
+ rawValue.flush();
+ // rawValue.getData() may return a larger byte array but Ints.fromByteArray will only read the first four bytes
+ return Ints.fromByteArray(rawValue.getData());
+ }
+
+ // EOF not reached and marker is not valid, then thrown an IOException since we can't make progress
+ throw new IOException(String.format("Invalid key for num entries appended found %s, expected : %s",
+ new String(rawKey.getData()), TxConstants.TransactionLog.NUM_ENTRIES_APPENDED));
+ }
+
+ private boolean isMarkerValid() {
+ // rawKey should have the expected length and the matching bytes should start at index 0
+ return rawKey.getLength() == KEY_BYTES.length && Bytes.indexOf(rawKey.getData(), KEY_BYTES) == 0;
+ }
+
+ public static void writeMarker(SequenceFile.Writer writer, int count) throws IOException {
+ writer.appendRaw(KEY_BYTES, 0, KEY_BYTES.length, new CommitEntriesCount(count));
+ }
+
+ @VisibleForTesting
+ static final class CommitEntriesCount implements SequenceFile.ValueBytes {
+ private final int numEntries;
+
+ public CommitEntriesCount(int numEntries) {
+ this.numEntries = numEntries;
+ }
+
+ @Override
+ public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
+ outStream.write(Ints.toByteArray(numEntries));
+ }
+
+ @Override
+ public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
+ throw new IllegalArgumentException("Commit Entries count writing is not expected to be compressed.");
+ }
+
+ @Override
+ public int getSize() {
+ return Ints.BYTES;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
new file mode 100644
index 0000000..ba781ac
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * Allows reading from and writing to a transaction write-ahead log stored in HDFS.
+ */
+public class HDFSTransactionLog extends AbstractTransactionLog {
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLog.class);
+
+ private final FileSystem fs;
+ private final Configuration hConf;
+ private final Path logPath;
+
+ /**
+ * Creates a new HDFS-backed write-ahead log for storing transaction state.
+ * @param fs Open FileSystem instance for opening log files in HDFS.
+ * @param hConf HDFS cluster configuration.
+ * @param logPath Path to the log file.
+ */
+ public HDFSTransactionLog(final FileSystem fs, final Configuration hConf,
+ final Path logPath, long timestamp, MetricsCollector metricsCollector) {
+ super(timestamp, metricsCollector);
+ this.fs = fs;
+ this.hConf = hConf;
+ this.logPath = logPath;
+ }
+
+ @Override
+ protected TransactionLogWriter createWriter() throws IOException {
+ return new LogWriter(fs, hConf, logPath);
+ }
+
+ @Override
+ public String getName() {
+ return logPath.getName();
+ }
+
+ @Override
+ public TransactionLogReader getReader() throws IOException {
+ FileStatus status = fs.getFileStatus(logPath);
+ long length = status.getLen();
+
+ TransactionLogReader reader = null;
+ // check if this file needs to be recovered due to failure
+ // Check for possibly empty file. With appends, currently Hadoop reports a
+ // zero length even if the file has been sync'd. Revisit if HDFS-376 or
+ // HDFS-878 is committed.
+ if (length <= 0) {
+ LOG.warn("File " + logPath + " might be still open, length is 0");
+ }
+
+ try {
+ HDFSUtil hdfsUtil = new HDFSUtil();
+ hdfsUtil.recoverFileLease(fs, logPath, hConf);
+ try {
+ FileStatus newStatus = fs.getFileStatus(logPath);
+ LOG.info("New file size for " + logPath + " is " + newStatus.getLen());
+ SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf);
+ reader = new HDFSTransactionLogReaderSupplier(fileReader).get();
+ } catch (EOFException e) {
+ if (length <= 0) {
+ // TODO should we ignore an empty, not-last log file if skip.errors
+ // is false? Either way, the caller should decide what to do. E.g.
+ // ignore if this is the last log in sequence.
+ // TODO is this scenario still possible if the log has been
+ // recovered (i.e. closed)
+ LOG.warn("Could not open " + logPath + " for reading. File is empty", e);
+ return null;
+ } else {
+ // EOFException being ignored
+ return null;
+ }
+ }
+ } catch (IOException e) {
+ throw e;
+ }
+ return reader;
+ }
+
+ @VisibleForTesting
+ static final class LogWriter implements TransactionLogWriter {
+ private final SequenceFile.Writer internalWriter;
+ public LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException {
+ // TODO: retry a few times to ride over transient failures?
+ SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+ metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
+ new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
+
+ this.internalWriter = SequenceFile.createWriter(fs, hConf, logPath, LongWritable.class, TransactionEdit.class,
+ SequenceFile.CompressionType.NONE, null, null, metadata);
+ LOG.debug("Created a new TransactionLog writer for " + logPath);
+ }
+
+ @Override
+ public void append(Entry entry) throws IOException {
+ internalWriter.append(entry.getKey(), entry.getEdit());
+ }
+
+ @Override
+ public void commitMarker(int count) throws IOException {
+ CommitMarkerCodec.writeMarker(internalWriter, count);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ internalWriter.syncFs();
+ }
+
+ @Override
+ public void close() throws IOException {
+ internalWriter.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
new file mode 100644
index 0000000..a517903
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.tephra.TxConstants;
+
+/**
+ * Provides the correct version of {@link TransactionLogReader}, based on the log's version metadata,
+ * to read HDFS Transaction Logs.
+ */
+public class HDFSTransactionLogReaderSupplier implements Supplier<TransactionLogReader> {
+ private final SequenceFile.Reader reader;
+ private final byte version;
+ private TransactionLogReader logReader;
+
+ public HDFSTransactionLogReaderSupplier(SequenceFile.Reader reader) {
+ this.reader = reader;
+ Text versionInfo = reader.getMetadata().get(new Text(TxConstants.TransactionLog.VERSION_KEY));
+ this.version = versionInfo == null ? 1 : Byte.parseByte(versionInfo.toString());
+ }
+
+ @Override
+ public TransactionLogReader get() {
+ if (logReader != null) {
+ return logReader;
+ }
+
+ switch (version) {
+ case 2:
+ logReader = new HDFSTransactionLogReaderV2(reader);
+ return logReader;
+ case 1:
+ logReader = new HDFSTransactionLogReaderV1(reader);
+ return logReader;
+ default:
+ throw new IllegalArgumentException(String.format("Invalid version %s found in the Transaction Log", version));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
new file mode 100644
index 0000000..faefaec
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * {@link TransactionLogReader} that can read v1 (default) version of Transaction logs. The logs are expected to
+ * have a sequence of {@link TransactionEdit}s.
+ */
+public class HDFSTransactionLogReaderV1 implements TransactionLogReader {
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV1.class);
+ private final SequenceFile.Reader reader;
+ private final LongWritable key;
+ private boolean closed;
+
+ public HDFSTransactionLogReaderV1(SequenceFile.Reader reader) {
+ this.reader = reader;
+ this.key = new LongWritable();
+ }
+
+ @Override
+ public TransactionEdit next() throws IOException {
+ return next(new TransactionEdit());
+ }
+
+ @Override
+ public TransactionEdit next(TransactionEdit reuse) throws IOException {
+ if (closed) {
+ return null;
+ }
+
+ try {
+ boolean successful = reader.next(key, reuse);
+ return successful ? reuse : null;
+ } catch (EOFException e) {
+ LOG.warn("Hit an unexpected EOF while trying to read the Transaction Edit. Skipping the entry.", e);
+ return null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ reader.close();
+ closed = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
new file mode 100644
index 0000000..ce50da8
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/**
+ * {@link TransactionLogReader} that can read v2 version of Transaction logs. The logs are expected to
+ * have commit markers that indicates the size of the batch of {@link TransactionEdit}s (follows the marker),
+ * that were synced together. If the expected number of {@link TransactionEdit}s are not present then that set of
+ * {@link TransactionEdit}s are discarded.
+ */
+public class HDFSTransactionLogReaderV2 implements TransactionLogReader {
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class);
+
+ private final SequenceFile.Reader reader;
+ private final Queue<TransactionEdit> transactionEdits;
+ private final CommitMarkerCodec commitMarkerCodec;
+ private final LongWritable key;
+
+ private boolean closed;
+
+ public HDFSTransactionLogReaderV2(SequenceFile.Reader reader) {
+ this.reader = reader;
+ this.transactionEdits = new ArrayDeque<>();
+ this.key = new LongWritable();
+ this.commitMarkerCodec = new CommitMarkerCodec();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ try {
+ commitMarkerCodec.close();
+ } finally {
+ reader.close();
+ closed = true;
+ }
+ }
+
+ @Override
+ public TransactionEdit next() throws IOException {
+ return next(null);
+ }
+
+ @Override
+ public TransactionEdit next(TransactionEdit reuse) throws IOException {
+ if (closed) {
+ return null;
+ }
+
+ if (!transactionEdits.isEmpty()) {
+ return transactionEdits.remove();
+ }
+
+ // Fetch the 'marker' and read 'marker' number of edits
+ populateTransactionEdits();
+ return transactionEdits.poll();
+ }
+
+ private void populateTransactionEdits() throws IOException {
+ // read the marker to determine numEntries to read.
+ int numEntries = 0;
+ try {
+ // can throw EOFException if reading of incomplete commit marker, no other action required since we can safely
+ // ignore this
+ numEntries = commitMarkerCodec.readMarker(reader);
+ } catch (EOFException e) {
+ LOG.warn("Reached EOF in log while trying to read commit marker", e);
+ }
+
+ for (int i = 0; i < numEntries; i++) {
+ TransactionEdit edit = new TransactionEdit();
+ try {
+ if (reader.next(key, edit)) {
+ transactionEdits.add(edit);
+ } else {
+ throw new EOFException("Attempt to read TransactionEdit failed.");
+ }
+ } catch (EOFException e) {
+ // we have reached EOF before reading back numEntries, we clear the partial list and return.
+ LOG.warn("Reached EOF in log before reading {} entries. Ignoring all {} edits since the last marker",
+ numEntries, transactionEdits.size(), e);
+ transactionEdits.clear();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java
new file mode 100644
index 0000000..d751ae2
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.CountingInputStream;
+import com.google.common.primitives.Longs;
+import com.google.inject.Inject;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.util.ConfigurationFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Handles persistence of transaction snapshot and logs to a directory in HDFS.
+ *
+ * The directory used for file storage is configured using the {@code data.tx.snapshot.dir} configuration property.
+ * Both snapshot and transaction log files are suffixed with a timestamp to allow easy ordering. Snapshot files
+ * are written with the filename "snapshot.<timestamp>". Transaction log files are written with the filename
+ * "txlog.<timestamp>".
+ */
+public class HDFSTransactionStateStorage extends AbstractTransactionStateStorage {
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionStateStorage.class);
+
+ private static final String SNAPSHOT_FILE_PREFIX = "snapshot.";
+ private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.snapshot.";
+ private static final String LOG_FILE_PREFIX = "txlog.";
+
+ private static final PathFilter SNAPSHOT_FILE_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(SNAPSHOT_FILE_PREFIX);
+ }
+ };
+
+ // buffer size used for HDFS reads and writes
+ private static final int BUFFER_SIZE = 16384;
+
+ private final Configuration hConf;
+ private final String configuredSnapshotDir;
+ private final MetricsCollector metricsCollector;
+ private FileSystem fs;
+ private Path snapshotDir;
+
+ @Inject
+ public HDFSTransactionStateStorage(Configuration hConf, SnapshotCodecProvider codecProvider,
+ MetricsCollector metricsCollector) {
+ super(codecProvider);
+ this.hConf = hConf;
+ this.configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
+ this.metricsCollector = metricsCollector;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Preconditions.checkState(configuredSnapshotDir != null,
+ "Snapshot directory is not configured. Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_DIR +
+ " in configuration.");
+ String hdfsUser = hConf.get(TxConstants.Manager.CFG_TX_HDFS_USER);
+ if (hdfsUser == null || UserGroupInformation.isSecurityEnabled()) {
+ if (hdfsUser != null && LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring configuration {}={}, running on secure Hadoop",
+ TxConstants.Manager.CFG_TX_HDFS_USER, hdfsUser);
+ }
+ // NOTE: we can start multiple times this storage. As hdfs uses per-jvm cache, we want to create new fs instead
+ // of getting closed one
+ fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf);
+ } else {
+ fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf, hdfsUser);
+ }
+ snapshotDir = new Path(configuredSnapshotDir);
+ LOG.info("Using snapshot dir " + snapshotDir);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ fs.close();
+ }
+
+ @Override
+ public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
+ // create a temporary file, and save the snapshot
+ Path snapshotTmpFile = new Path(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
+ LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile);
+
+ FSDataOutputStream out = fs.create(snapshotTmpFile, false, BUFFER_SIZE);
+ // encode the snapshot and stream the serialized version to the file
+ try {
+ codecProvider.encode(out, snapshot);
+ } finally {
+ out.close();
+ }
+
+ // move the temporary file into place with the correct filename
+ Path finalFile = new Path(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
+ fs.rename(snapshotTmpFile, finalFile);
+ LOG.debug("Completed snapshot to file {}", finalFile);
+ }
+
+ @Override
+ public TransactionSnapshot getLatestSnapshot() throws IOException {
+ InputStream in = getLatestSnapshotInputStream();
+ if (in == null) {
+ return null;
+ }
+ try {
+ return readSnapshotInputStream(in);
+ } finally {
+ in.close();
+ }
+ }
+
+ @Override
+ public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
+ InputStream in = getLatestSnapshotInputStream();
+ if (in == null) {
+ return null;
+ }
+ try {
+ return readTransactionVisibilityStateFromInputStream(in);
+ } finally {
+ in.close();
+ }
+ }
+
+ private InputStream getLatestSnapshotInputStream() throws IOException {
+ TimestampedFilename[] snapshots = listSnapshotFiles();
+ Arrays.sort(snapshots);
+ if (snapshots.length > 0) {
+ // last is the most recent
+ return fs.open(snapshots[snapshots.length - 1].getPath(), BUFFER_SIZE);
+ }
+
+ LOG.info("No snapshot files found in {}", snapshotDir);
+ return null;
+ }
+
+ private TransactionSnapshot readSnapshotInputStream(InputStream in) throws IOException {
+ CountingInputStream countingIn = new CountingInputStream(in);
+ TransactionSnapshot snapshot = codecProvider.decode(countingIn);
+ LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount());
+ return snapshot;
+ }
+
+ private TransactionVisibilityState readTransactionVisibilityStateFromInputStream(InputStream in) throws IOException {
+ CountingInputStream countingIn = new CountingInputStream(in);
+ TransactionVisibilityState state = codecProvider.decodeTransactionVisibilityState(countingIn);
+ LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount());
+ return state;
+ }
+
+ private TransactionSnapshot readSnapshotFile(Path filePath) throws IOException {
+ FSDataInputStream in = fs.open(filePath, BUFFER_SIZE);
+ try {
+ return readSnapshotInputStream(in);
+ } finally {
+ in.close();
+ }
+ }
+
+ private TimestampedFilename[] listSnapshotFiles() throws IOException {
+ FileStatus[] snapshotFileStatuses = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER);
+ TimestampedFilename[] snapshotFiles = new TimestampedFilename[snapshotFileStatuses.length];
+ for (int i = 0; i < snapshotFileStatuses.length; i++) {
+ snapshotFiles[i] = new TimestampedFilename(snapshotFileStatuses[i].getPath());
+ }
+ return snapshotFiles;
+ }
+
+ @Override
+ public long deleteOldSnapshots(int numberToKeep) throws IOException {
+ TimestampedFilename[] snapshots = listSnapshotFiles();
+ if (snapshots.length == 0) {
+ return -1;
+ }
+ Arrays.sort(snapshots, Collections.reverseOrder());
+ if (snapshots.length <= numberToKeep) {
+ // nothing to remove, oldest timestamp is the last snapshot
+ return snapshots[snapshots.length - 1].getTimestamp();
+ }
+ int toRemoveCount = snapshots.length - numberToKeep;
+ TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount];
+ System.arraycopy(snapshots, numberToKeep, toRemove, 0, toRemoveCount);
+
+ for (TimestampedFilename f : toRemove) {
+ LOG.debug("Removing old snapshot file {}", f.getPath());
+ fs.delete(f.getPath(), false);
+ }
+ long oldestTimestamp = snapshots[numberToKeep - 1].getTimestamp();
+ LOG.debug("Removed {} old snapshot files prior to {}", toRemoveCount, oldestTimestamp);
+ return oldestTimestamp;
+ }
+
+ @Override
+ public List<String> listSnapshots() throws IOException {
+ FileStatus[] files = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER);
+ return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable FileStatus input) {
+ return input.getPath().getName();
+ }
+ });
+ }
+
+ @Override
+ public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
+ FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(timestamp, Long.MAX_VALUE));
+ TimestampedFilename[] timestampedFiles = new TimestampedFilename[statuses.length];
+ for (int i = 0; i < statuses.length; i++) {
+ timestampedFiles[i] = new TimestampedFilename(statuses[i].getPath());
+ }
+ return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() {
+ @Nullable
+ @Override
+ public TransactionLog apply(@Nullable TimestampedFilename input) {
+ return openLog(input.getPath(), input.getTimestamp());
+ }
+ });
+ }
+
+ @Override
+ public TransactionLog createLog(long timestamp) throws IOException {
+ Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timestamp);
+ return openLog(newLog, timestamp);
+ }
+
+ private TransactionLog openLog(Path path, long timestamp) {
+ return new HDFSTransactionLog(fs, hConf, path, timestamp, metricsCollector);
+ }
+
+ @Override
+ public void deleteLogsOlderThan(long timestamp) throws IOException {
+ FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(0, timestamp));
+ int removedCnt = 0;
+ for (FileStatus status : statuses) {
+ LOG.debug("Removing old transaction log {}", status.getPath());
+ if (fs.delete(status.getPath(), false)) {
+ removedCnt++;
+ } else {
+ LOG.error("Failed to delete transaction log file {}", status.getPath());
+ }
+ }
+ LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp);
+ }
+
+ @Override
+ public void setupStorage() throws IOException {
+ if (!fs.exists(snapshotDir)) {
+ LOG.info("Creating snapshot dir at {}", snapshotDir);
+ fs.mkdirs(snapshotDir);
+ } else {
+ Preconditions.checkState(fs.isDirectory(snapshotDir),
+ "Configured snapshot directory " + snapshotDir + " is not a directory!");
+ }
+ }
+
+ @Override
+ public List<String> listLogs() throws IOException {
+ FileStatus[] files = fs.listStatus(snapshotDir, new LogFileFilter(0, Long.MAX_VALUE));
+ return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable FileStatus input) {
+ return input.getPath().getName();
+ }
+ });
+ }
+
+ @Override
+ public String getLocation() {
+ return snapshotDir.toString();
+ }
+
+ private static class LogFileFilter implements PathFilter {
+ // starting time of files to include (inclusive)
+ private final long startTime;
+ // ending time of files to include (exclusive)
+ private final long endTime;
+
+ public LogFileFilter(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith(LOG_FILE_PREFIX)) {
+ String[] parts = path.getName().split("\\.");
+ if (parts.length == 2) {
+ try {
+ long fileTime = Long.parseLong(parts[1]);
+ return fileTime >= startTime && fileTime < endTime;
+ } catch (NumberFormatException ignored) {
+ LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", path.getName());
+ }
+ }
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Represents a filename composed of a prefix and a ".timestamp" suffix. This is useful for manipulating both
+ * snapshot and transaction log filenames.
+ */
+ private static class TimestampedFilename implements Comparable<TimestampedFilename> {
+ private Path path;
+ private String prefix;
+ private long timestamp;
+
+ public TimestampedFilename(Path path) {
+ this.path = path;
+ String[] parts = path.getName().split("\\.");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Filename " + path.getName() +
+ " did not match the expected pattern prefix.timestamp");
+ }
+ prefix = parts[0];
+ timestamp = Long.parseLong(parts[1]);
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public int compareTo(TimestampedFilename other) {
+ int res = prefix.compareTo(other.getPrefix());
+ if (res == 0) {
+ res = Longs.compare(timestamp, other.getTimestamp());
+ }
+ return res;
+ }
+ }
+
+ // TODO move this out as a separate command line tool
+ private enum CLIMode { SNAPSHOT, TXLOG };
+ /**
+ * Reads a transaction state snapshot or transaction log from HDFS and prints the entries to stdout.
+ *
+ * Supports the following options:
+ * -s read snapshot state (defaults to the latest)
+ * -l read a transaction log
+ * [filename] reads the given file
+ * @param args
+ */
+ public static void main(String[] args) {
+ List<String> filenames = Lists.newArrayList();
+ CLIMode mode = null;
+ for (String arg : args) {
+ if ("-s".equals(arg)) {
+ mode = CLIMode.SNAPSHOT;
+ } else if ("-l".equals(arg)) {
+ mode = CLIMode.TXLOG;
+ } else if ("-h".equals(arg)) {
+ printUsage(null);
+ } else {
+ filenames.add(arg);
+ }
+ }
+
+ if (mode == null) {
+ printUsage("ERROR: Either -s or -l is required to set mode.", 1);
+ }
+
+ Configuration config = new ConfigurationFactory().get();
+
+ // Use the no-op metrics collector. We are being run as a command line tool, so there are no relevant metrics
+ // to report
+ HDFSTransactionStateStorage storage =
+ new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector());
+ storage.startAndWait();
+ try {
+ switch (mode) {
+ case SNAPSHOT:
+ try {
+ if (filenames.isEmpty()) {
+ TransactionSnapshot snapshot = storage.getLatestSnapshot();
+ printSnapshot(snapshot);
+ }
+ for (String file : filenames) {
+ Path path = new Path(file);
+ TransactionSnapshot snapshot = storage.readSnapshotFile(path);
+ printSnapshot(snapshot);
+ System.out.println();
+ }
+ } catch (IOException ioe) {
+ System.err.println("Error reading snapshot files: " + ioe.getMessage());
+ ioe.printStackTrace();
+ System.exit(1);
+ }
+ break;
+ case TXLOG:
+ if (filenames.isEmpty()) {
+ printUsage("ERROR: At least one transaction log filename is required!", 1);
+ }
+ for (String file : filenames) {
+ TimestampedFilename timestampedFilename = new TimestampedFilename(new Path(file));
+ TransactionLog log = storage.openLog(timestampedFilename.getPath(), timestampedFilename.getTimestamp());
+ printLog(log);
+ System.out.println();
+ }
+ break;
+ }
+ } finally {
+ storage.stop();
+ }
+ }
+
+ private static void printUsage(String message) {
+ printUsage(message, 0);
+ }
+
+ private static void printUsage(String message, int exitCode) {
+ if (message != null) {
+ System.out.println(message);
+ }
+ System.out.println("Usage: java " + HDFSTransactionStateStorage.class.getName() + " (-s|-l) file1 [file2...]");
+ System.out.println();
+ System.out.println("\t-s\tRead files as transaction state snapshots (will default to latest if no file given)");
+ System.out.println("\t-l\tRead files as transaction logs [filename is required]");
+ System.out.println("\t-h\tPrint this message");
+ System.exit(exitCode);
+ }
+
+ private static void printSnapshot(TransactionSnapshot snapshot) {
+ Date snapshotDate = new Date(snapshot.getTimestamp());
+ System.out.println("TransactionSnapshot at " + snapshotDate.toString());
+ System.out.println("\t" + snapshot.toString());
+ }
+
+ private static void printLog(TransactionLog log) {
+ try {
+ System.out.println("TransactionLog " + log.getName());
+ TransactionLogReader reader = log.getReader();
+ TransactionEdit edit;
+ long seq = 0;
+ while ((edit = reader.next()) != null) {
+ System.out.println(String.format(" %d: %s", seq++, edit.toString()));
+ }
+ } catch (IOException ioe) {
+ System.err.println("ERROR reading log " + log.getName() + ": " + ioe.getMessage());
+ ioe.printStackTrace();
+ }
+ }
+}