You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:39 UTC
[33/51] [partial] incubator-tephra git commit: Rename package to
org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java
new file mode 100644
index 0000000..4b9e646
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.Method;
+
+/**
+ * Utility for handling HDFS file lease recovery. This is a copy-n-paste fork of
+ * {@link org.apache.hadoop.hbase.util.FSHDFSUtils} from the latest HBase 0.94 version (as of 0.94.12),
+ * which contains some additional fixes not present in our current HBase dependency version --
+ * mainly checking the return value of the {@code DistributedFileSystem.recoverLease()} call to verify that
+ * recovery succeeded.
+ */
+public class HDFSUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSUtil.class);
+ /**
+ * Recover the lease from HDFS, retrying multiple times.
+ */
+ public void recoverFileLease(final FileSystem fs, final Path p,
+ Configuration conf)
+ throws IOException {
+ // lease recovery not needed for local file system case.
+ if (!(fs instanceof DistributedFileSystem)) {
+ return;
+ }
+ recoverDFSFileLease((DistributedFileSystem) fs, p, conf);
+ }
+
+ /*
+ * Run the dfs recover lease. recoverLease is asynchronous. It returns:
+ * -false when it starts the lease recovery (i.e. lease recovery not *yet* done)
+ * - true when the lease recovery has succeeded or the file is closed.
+ * But, we have to be careful. Each time we call recoverLease, it starts the recover lease
+ * process over from the beginning. We could put ourselves in a situation where we are
+ * doing nothing but starting a recovery, interrupting it to start again, and so on.
+ * The findings over in HBASE-8354 have it that the namenode will try to recover the lease
+ * on the file's primary node. If all is well, it should return near immediately. But,
+ * as is common, it is the very primary node that has crashed and so the namenode will be
+ * stuck waiting on a socket timeout before it will ask another datanode to start the
+ * recovery. It does not help if we call recoverLease in the meantime and in particular,
+ * subsequent to the socket timeout, a recoverLease invocation will cause us to start
+ * over from square one (possibly waiting on socket timeout against primary node). So,
+ * in the below, we do the following:
+ * 1. Call recoverLease.
+ * 2. If it returns true, break.
+ * 3. If it returns false, wait a few seconds and then call it again.
+ * 4. If it returns true, break.
+ * 5. If it returns false, wait for what we think the datanode socket timeout is
+ * (configurable) and then try again.
+ * 6. If it returns true, break.
+ * 7. If it returns false, repeat starting at step 5. above.
+ *
+ * If HDFS-4525 is available, call it every second and we might be able to exit early.
+ */
+ boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
+ final Configuration conf)
+ throws IOException {
+ LOG.info("Recovering lease on dfs file " + p);
+ long startWaiting = System.currentTimeMillis();
+ // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
+ // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
+ // beyond that limit 'to be safe'.
+ long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
+ // This setting should be what the cluster dfs heartbeat is set to.
+ long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 3000);
+ // This should be set to how long it'll take for us to timeout against primary datanode if it
+ // is dead. We set it to 61 seconds, 1 second than the default READ_TIMEOUT in HDFS, the
+ // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY.
+ long subsequentPause = conf.getInt("hbase.lease.recovery.dfs.timeout", 61 * 1000);
+
+ Method isFileClosedMeth = null;
+ // whether we need to look for isFileClosed method
+ boolean findIsFileClosedMeth = true;
+ boolean recovered = false;
+ // We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
+ for (int nbAttempt = 0; !recovered; nbAttempt++) {
+ recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
+ if (recovered || checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
+ break;
+ }
+ try {
+ // On the first time through wait the short 'firstPause'.
+ if (nbAttempt == 0) {
+ Thread.sleep(firstPause);
+ } else {
+ // Cycle here until subsequentPause elapses. While spinning, check isFileClosed if
+ // available (should be in hadoop 2.0.5... not in hadoop 1 though.
+ long localStartWaiting = System.currentTimeMillis();
+ while ((System.currentTimeMillis() - localStartWaiting) <
+ subsequentPause) {
+ Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
+ if (findIsFileClosedMeth) {
+ try {
+ isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
+ new Class[]{ Path.class });
+ } catch (NoSuchMethodException nsme) {
+ LOG.debug("isFileClosed not available");
+ } finally {
+ findIsFileClosedMeth = false;
+ }
+ }
+ if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
+ recovered = true;
+ break;
+ }
+ }
+ }
+ } catch (InterruptedException ie) {
+ InterruptedIOException iioe = new InterruptedIOException();
+ iioe.initCause(ie);
+ throw iioe;
+ }
+ }
+ return recovered;
+ }
+
+ boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
+ final int nbAttempt, final Path p, final long startWaiting) {
+ if (recoveryTimeout < System.currentTimeMillis()) {
+ LOG.warn("Cannot recoverLease after trying for " +
+ conf.getInt("hbase.lease.recovery.timeout", 900000) +
+ "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
+ getLogMessageDetail(nbAttempt, p, startWaiting));
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Try to recover the lease.
+ * @param dfs The filesystem instance.
+ * @param nbAttempt Count number of this attempt.
+ * @param p Path of the file to recover.
+ * @param startWaiting Timestamp of when we started attempting to recover the file lease.
+ * @return True if dfs#recoverLease came by true.
+ * @throws java.io.FileNotFoundException
+ */
+ boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p,
+ final long startWaiting)
+ throws FileNotFoundException {
+ boolean recovered = false;
+ try {
+ recovered = dfs.recoverLease(p);
+ LOG.info("recoverLease=" + recovered + ", " +
+ getLogMessageDetail(nbAttempt, p, startWaiting));
+ } catch (IOException e) {
+ if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
+ // This exception comes out instead of FNFE, fix it
+ throw new FileNotFoundException("The given file wasn't found at " + p);
+ } else if (e instanceof FileNotFoundException) {
+ throw (FileNotFoundException) e;
+ }
+ LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
+ }
+ return recovered;
+ }
+
+ /**
+ * @param nbAttempt Attempt number for the lease recovery.
+ * @param p Path of the file to recover.
+ * @param startWaiting Timestamp of when we started attempting to recover the file lease.
+ * @return Detail to append to any log message around lease recovering.
+ */
+ private String getLogMessageDetail(final int nbAttempt, final Path p, final long startWaiting) {
+ return "attempt=" + nbAttempt + " on file=" + p + " after " +
+ (System.currentTimeMillis() - startWaiting) + "ms";
+ }
+
+ /**
+ * Call HDFS-4525 isFileClosed if it is available.
+ * @param dfs Filesystem instance to use.
+ * @param m Method instance to call.
+ * @param p Path of the file to check is closed.
+ * @return True if file is closed.
+ */
+ private boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) {
+ try {
+ return (Boolean) m.invoke(dfs, p);
+ } catch (SecurityException e) {
+ LOG.warn("No access", e);
+ } catch (Exception e) {
+ LOG.warn("Failed invocation for " + p.toString(), e);
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
new file mode 100644
index 0000000..d81ba38
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.tephra.metrics.MetricsCollector;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * Reads and writes transaction logs against files in the local filesystem.
+ */
+public class LocalFileTransactionLog extends AbstractTransactionLog {
+ private final File logFile;
+
+ /**
+ * Creates a new transaction log using the given file instance.
+ * @param logFile The log file to use.
+ */
+ public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) {
+ super(timestamp, metricsCollector);
+ this.logFile = logFile;
+ }
+
+ @Override
+ public String getName() {
+ return logFile.getAbsolutePath();
+ }
+
+ @Override
+ protected TransactionLogWriter createWriter() throws IOException {
+ return new LogWriter(logFile);
+ }
+
+ @Override
+ public TransactionLogReader getReader() throws IOException {
+ return new LogReader(logFile);
+ }
+
+ private static final class LogWriter implements TransactionLogWriter {
+ private final FileOutputStream fos;
+ private final DataOutputStream out;
+
+ public LogWriter(File logFile) throws IOException {
+ this.fos = new FileOutputStream(logFile);
+ this.out = new DataOutputStream(new BufferedOutputStream(fos, LocalFileTransactionStateStorage.BUFFER_SIZE));
+ }
+
+ @Override
+ public void append(Entry entry) throws IOException {
+ entry.write(out);
+ }
+
+ @Override
+ public void commitMarker(int count) throws IOException {
+ // skip for local file
+ }
+
+ @Override
+ public void sync() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.flush();
+ out.close();
+ fos.close();
+ }
+ }
+
+ private static final class LogReader implements TransactionLogReader {
+ private final FileInputStream fin;
+ private final DataInputStream in;
+ private Entry reuseEntry = new Entry();
+
+ public LogReader(File logFile) throws IOException {
+ this.fin = new FileInputStream(logFile);
+ this.in = new DataInputStream(new BufferedInputStream(fin, LocalFileTransactionStateStorage.BUFFER_SIZE));
+ }
+
+ @Override
+ public TransactionEdit next() throws IOException {
+ Entry entry = new Entry();
+ try {
+ entry.readFields(in);
+ } catch (EOFException eofe) {
+ // signal end of file by returning null
+ return null;
+ }
+ return entry.getEdit();
+ }
+
+ @Override
+ public TransactionEdit next(TransactionEdit reuse) throws IOException {
+ try {
+ reuseEntry.getKey().readFields(in);
+ reuse.readFields(in);
+ } catch (EOFException eofe) {
+ // signal end of file by returning null
+ return null;
+ }
+ return reuse;
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ fin.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
new file mode 100644
index 0000000..beddbb2
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import com.google.common.primitives.Longs;
+import com.google.inject.Inject;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Persists transaction snapshots and write-ahead logs to files on the local filesystem.
+ */
+public class LocalFileTransactionStateStorage extends AbstractTransactionStateStorage {
+ private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.";
+ private static final String SNAPSHOT_FILE_PREFIX = "snapshot.";
+ private static final String LOG_FILE_PREFIX = "txlog.";
+ private static final Logger LOG = LoggerFactory.getLogger(LocalFileTransactionStateStorage.class);
+ static final int BUFFER_SIZE = 16384;
+
+ private static final FilenameFilter SNAPSHOT_FILE_FILTER = new FilenameFilter() {
+ @Override
+ public boolean accept(File file, String s) {
+ return s.startsWith(SNAPSHOT_FILE_PREFIX);
+ }
+ };
+
+ private final String configuredSnapshotDir;
+ private final MetricsCollector metricsCollector;
+ private File snapshotDir;
+
+ @Inject
+ public LocalFileTransactionStateStorage(Configuration conf, SnapshotCodecProvider codecProvider,
+ MetricsCollector metricsCollector) {
+ super(codecProvider);
+ this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR);
+ this.metricsCollector = metricsCollector;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Preconditions.checkState(configuredSnapshotDir != null,
+ "Snapshot directory is not configured. Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR +
+ " in configuration.");
+ snapshotDir = new File(configuredSnapshotDir);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // nothing to do
+ }
+
+ @Override
+ public String getLocation() {
+ return snapshotDir.getAbsolutePath();
+ }
+
+ @Override
+ public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
+ // save the snapshot to a temporary file
+ File snapshotTmpFile = new File(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
+ LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile);
+ OutputStream out = Files.newOutputStreamSupplier(snapshotTmpFile).getOutput();
+ boolean threw = true;
+ try {
+ codecProvider.encode(out, snapshot);
+ threw = false;
+ } finally {
+ Closeables.close(out, threw);
+ }
+
+ // move the temporary file into place with the correct filename
+ File finalFile = new File(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
+ if (!snapshotTmpFile.renameTo(finalFile)) {
+ throw new IOException("Failed renaming temporary snapshot file " + snapshotTmpFile.getName() + " to " +
+ finalFile.getName());
+ }
+
+ LOG.debug("Completed snapshot to file {}", finalFile);
+ }
+
+ @Override
+ public TransactionSnapshot getLatestSnapshot() throws IOException {
+ InputStream is = getLatestSnapshotInputStream();
+ if (is == null) {
+ return null;
+ }
+ try {
+ return readSnapshotFile(is);
+ } finally {
+ is.close();
+ }
+ }
+
+ @Override
+ public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
+ InputStream is = getLatestSnapshotInputStream();
+ if (is == null) {
+ return null;
+ }
+ try {
+ return codecProvider.decodeTransactionVisibilityState(is);
+ } finally {
+ is.close();
+ }
+ }
+
+ private InputStream getLatestSnapshotInputStream() throws IOException {
+ File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
+ TimestampedFilename mostRecent = null;
+ for (File file : snapshotFiles) {
+ TimestampedFilename tsFile = new TimestampedFilename(file);
+ if (mostRecent == null || tsFile.compareTo(mostRecent) > 0) {
+ mostRecent = tsFile;
+ }
+ }
+
+ if (mostRecent == null) {
+ LOG.info("No snapshot files found in {}", snapshotDir.getAbsolutePath());
+ return null;
+ }
+
+ return new FileInputStream(mostRecent.getFile());
+ }
+
+ private TransactionSnapshot readSnapshotFile(InputStream is) throws IOException {
+ return codecProvider.decode(is);
+ }
+
+ @Override
+ public long deleteOldSnapshots(int numberToKeep) throws IOException {
+ File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
+ if (snapshotFiles.length == 0) {
+ return -1;
+ }
+ TimestampedFilename[] snapshotFilenames = new TimestampedFilename[snapshotFiles.length];
+ for (int i = 0; i < snapshotFiles.length; i++) {
+ snapshotFilenames[i] = new TimestampedFilename(snapshotFiles[i]);
+ }
+ Arrays.sort(snapshotFilenames, Collections.reverseOrder());
+ if (snapshotFilenames.length <= numberToKeep) {
+ // nothing to delete, just return the oldest timestamp
+ return snapshotFilenames[snapshotFilenames.length - 1].getTimestamp();
+ }
+ int toRemoveCount = snapshotFilenames.length - numberToKeep;
+ TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount];
+ System.arraycopy(snapshotFilenames, numberToKeep, toRemove, 0, toRemoveCount);
+ int removedCnt = 0;
+ for (int i = 0; i < toRemove.length; i++) {
+ File currentFile = toRemove[i].getFile();
+ LOG.debug("Removing old snapshot file {}", currentFile.getAbsolutePath());
+ if (!toRemove[i].getFile().delete()) {
+ LOG.error("Failed deleting snapshot file {}", currentFile.getAbsolutePath());
+ } else {
+ removedCnt++;
+ }
+ }
+ long oldestTimestamp = snapshotFilenames[numberToKeep - 1].getTimestamp();
+ LOG.info("Removed {} out of {} expected snapshot files older than {}", removedCnt, toRemoveCount, oldestTimestamp);
+ return oldestTimestamp;
+ }
+
+ @Override
+ public List<String> listSnapshots() throws IOException {
+ File[] snapshots = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
+ return Lists.transform(Arrays.asList(snapshots), new Function<File, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable File input) {
+ return input.getName();
+ }
+ });
+ }
+
+ @Override
+ public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
+ File[] logFiles = snapshotDir.listFiles(new LogFileFilter(timestamp, Long.MAX_VALUE));
+ TimestampedFilename[] timestampedFiles = new TimestampedFilename[logFiles.length];
+ for (int i = 0; i < logFiles.length; i++) {
+ timestampedFiles[i] = new TimestampedFilename(logFiles[i]);
+ }
+ // logs need to be processed in ascending order
+ Arrays.sort(timestampedFiles);
+ return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() {
+ @Nullable
+ @Override
+ public TransactionLog apply(@Nullable TimestampedFilename input) {
+ return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector);
+ }
+ });
+ }
+
+ @Override
+ public TransactionLog createLog(long timestamp) throws IOException {
+ File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp);
+ LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath());
+ return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector);
+ }
+
+ @Override
+ public void deleteLogsOlderThan(long timestamp) throws IOException {
+ File[] logFiles = snapshotDir.listFiles(new LogFileFilter(0, timestamp));
+ int removedCnt = 0;
+ for (File file : logFiles) {
+ LOG.debug("Removing old transaction log {}", file.getPath());
+ if (file.delete()) {
+ removedCnt++;
+ } else {
+ LOG.warn("Failed to remove log file {}", file.getAbsolutePath());
+ }
+ }
+ LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp);
+ }
+
+ @Override
+ public void setupStorage() throws IOException {
+ // create the directory if it doesn't exist
+ if (!snapshotDir.exists()) {
+ if (!snapshotDir.mkdirs()) {
+ throw new IOException("Failed to create directory " + configuredSnapshotDir +
+ " for transaction snapshot storage");
+ }
+ } else {
+ Preconditions.checkState(snapshotDir.isDirectory(),
+ "Configured snapshot directory " + configuredSnapshotDir + " is not a directory!");
+ Preconditions.checkState(snapshotDir.canWrite(), "Configured snapshot directory " +
+ configuredSnapshotDir + " exists but is not writable!");
+ }
+ }
+
+ @Override
+ public List<String> listLogs() throws IOException {
+ File[] logs = snapshotDir.listFiles(new LogFileFilter(0, Long.MAX_VALUE));
+ return Lists.transform(Arrays.asList(logs), new Function<File, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable File input) {
+ return input.getName();
+ }
+ });
+ }
+
+ private static class LogFileFilter implements FilenameFilter {
+ private final long startTime;
+ private final long endTime;
+
+ public LogFileFilter(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ @Override
+ public boolean accept(File file, String s) {
+ if (s.startsWith(LOG_FILE_PREFIX)) {
+ String[] parts = s.split("\\.");
+ if (parts.length == 2) {
+ try {
+ long fileTime = Long.parseLong(parts[1]);
+ return fileTime >= startTime && fileTime < endTime;
+ } catch (NumberFormatException ignored) {
+ LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", s);
+ }
+ }
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Represents a filename composed of a prefix and a ".timestamp" suffix. This is useful for manipulating both
+ * snapshot and transaction log filenames.
+ */
+ private static class TimestampedFilename implements Comparable<TimestampedFilename> {
+ private File file;
+ private String prefix;
+ private long timestamp;
+
+ public TimestampedFilename(File file) {
+ this.file = file;
+ String[] parts = file.getName().split("\\.");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Filename " + file.getName() +
+ " did not match the expected pattern prefix.timestamp");
+ }
+ prefix = parts[0];
+ timestamp = Long.parseLong(parts[1]);
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public int compareTo(TimestampedFilename other) {
+ int res = prefix.compareTo(other.getPrefix());
+ if (res == 0) {
+ res = Longs.compare(timestamp, other.getTimestamp());
+ }
+ return res;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java
new file mode 100644
index 0000000..12f2475
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.inject.Inject;
+import org.apache.tephra.snapshot.SnapshotCodec;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Minimal {@link TransactionStateStorage} implementation that does nothing, i.e. does not maintain any actual state.
+ */
+public class NoOpTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
+
+ private final SnapshotCodec codec;
+
+ @Inject
+ public NoOpTransactionStateStorage(SnapshotCodecProvider codecProvider) {
+ codec = codecProvider;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ }
+
+ @Override
+ public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
+ codec.encode(out, snapshot);
+ }
+
+ @Override
+ public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
+ }
+
+ @Override
+ public TransactionSnapshot getLatestSnapshot() throws IOException {
+ return null;
+ }
+
+ @Override
+ public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
+ return null;
+ }
+
+ @Override
+ public long deleteOldSnapshots(int numberToKeep) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public List<String> listSnapshots() throws IOException {
+ return new ArrayList<>(0);
+ }
+
+ @Override
+ public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
+ return new ArrayList<>(0);
+ }
+
+ @Override
+ public TransactionLog createLog(long timestamp) throws IOException {
+ return new NoOpTransactionLog();
+ }
+
+ @Override
+ public void deleteLogsOlderThan(long timestamp) throws IOException {
+ }
+
+ @Override
+ public void setupStorage() throws IOException {
+ }
+
+ @Override
+ public List<String> listLogs() throws IOException {
+ return new ArrayList<>(0);
+ }
+
+ @Override
+ public String getLocation() {
+ return "no-op";
+ }
+
+ private static class NoOpTransactionLog implements TransactionLog {
+ private long timestamp = System.currentTimeMillis();
+
+ @Override
+ public String getName() {
+ return "no-op";
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public void append(TransactionEdit edit) throws IOException {
+ }
+
+ @Override
+ public void append(List<TransactionEdit> edits) throws IOException {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public TransactionLogReader getReader() {
+ return new TransactionLogReader() {
+ @Override
+ public TransactionEdit next() {
+ return null;
+ }
+
+ @Override
+ public TransactionEdit next(TransactionEdit reuse) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
new file mode 100644
index 0000000..1d07e72
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.io.Writable;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Represents a transaction state change in the {@link TransactionLog}.
+ */
+public class TransactionEdit implements Writable {
+
+ /**
+ * The possible state changes for a transaction.
+ */
+ public enum State {
+ INPROGRESS, COMMITTING, COMMITTED, INVALID, ABORTED, MOVE_WATERMARK, TRUNCATE_INVALID_TX, CHECKPOINT
+ }
+
+ private long writePointer;
+
+ /**
+ * stores the value of visibility upper bound
+ * (see {@link TransactionManager.InProgressTx#getVisibilityUpperBound()})
+ * for edit of {@link State#INPROGRESS} only
+ */
+ private long visibilityUpperBound;
+ private long commitPointer;
+ private long expirationDate;
+ private State state;
+ private Set<ChangeId> changes;
+ /** Whether or not the COMMITTED change should be fully committed. */
+ private boolean canCommit;
+ private TransactionType type;
+ private Set<Long> truncateInvalidTx;
+ private long truncateInvalidTxTime;
+ private long parentWritePointer;
+ private long[] checkpointPointers;
+
+ // for Writable
+ public TransactionEdit() {
+ this.changes = Sets.newHashSet();
+ this.truncateInvalidTx = Sets.newHashSet();
+ }
+
+ // package private for testing
+ TransactionEdit(long writePointer, long visibilityUpperBound, State state, long expirationDate,
+ Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type,
+ Set<Long> truncateInvalidTx, long truncateInvalidTxTime, long parentWritePointer,
+ long[] checkpointPointers) {
+ this.writePointer = writePointer;
+ this.visibilityUpperBound = visibilityUpperBound;
+ this.state = state;
+ this.expirationDate = expirationDate;
+ this.changes = changes != null ? changes : Collections.<ChangeId>emptySet();
+ this.commitPointer = commitPointer;
+ this.canCommit = canCommit;
+ this.type = type;
+ this.truncateInvalidTx = truncateInvalidTx != null ? truncateInvalidTx : Collections.<Long>emptySet();
+ this.truncateInvalidTxTime = truncateInvalidTxTime;
+ this.parentWritePointer = parentWritePointer;
+ this.checkpointPointers = checkpointPointers;
+ }
+
+ /**
+ * Returns the transaction write pointer assigned for the state change.
+ */
+ public long getWritePointer() {
+ return writePointer;
+ }
+
+ void setWritePointer(long writePointer) {
+ this.writePointer = writePointer;
+ }
+
+ public long getVisibilityUpperBound() {
+ return visibilityUpperBound;
+ }
+
+ void setVisibilityUpperBound(long visibilityUpperBound) {
+ this.visibilityUpperBound = visibilityUpperBound;
+ }
+
+ /**
+ * Returns the type of state change represented.
+ */
+ public State getState() {
+ return state;
+ }
+
+ void setState(State state) {
+ this.state = state;
+ }
+
+ /**
+ * Returns any expiration timestamp (in milliseconds) associated with the state change. This should only
+ * be populated for changes of type {@link State#INPROGRESS}.
+ */
+ public long getExpiration() {
+ return expirationDate;
+ }
+
+ void setExpiration(long expirationDate) {
+ this.expirationDate = expirationDate;
+ }
+
+ /**
+ * @return the set of changed row keys associated with the state change. This is only populated for edits
+ * of type {@link State#COMMITTING} or {@link State#COMMITTED}.
+ */
+ public Set<ChangeId> getChanges() {
+ return changes;
+ }
+
+ void setChanges(Set<ChangeId> changes) {
+ this.changes = changes;
+ }
+
+ /**
+ * Returns the write pointer used to commit the row key change set. This is only populated for edits of type
+ * {@link State#COMMITTED}.
+ */
+ public long getCommitPointer() {
+ return commitPointer;
+ }
+
+ void setCommitPointer(long commitPointer) {
+ this.commitPointer = commitPointer;
+ }
+
+ /**
+ * Returns whether or not the transaction should be moved to the committed set. This is only populated for edits
+ * of type {@link State#COMMITTED}.
+ */
+ public boolean getCanCommit() {
+ return canCommit;
+ }
+
+ void setCanCommit(boolean canCommit) {
+ this.canCommit = canCommit;
+ }
+
+ /**
+ * Returns the transaction type. This is only populated for edits of type {@link State#INPROGRESS} or
+ * {@link State#ABORTED}.
+ */
+ public TransactionType getType() {
+ return type;
+ }
+
+ void setType(TransactionType type) {
+ this.type = type;
+ }
+
+ /**
+ * Returns the transaction ids to be removed from invalid transaction list. This is only populated for
+ * edits of type {@link State#TRUNCATE_INVALID_TX}
+ */
+ public Set<Long> getTruncateInvalidTx() {
+ return truncateInvalidTx;
+ }
+
+ void setTruncateInvalidTx(Set<Long> truncateInvalidTx) {
+ this.truncateInvalidTx = truncateInvalidTx;
+ }
+
+ /**
+ * Returns the time until which the invalid transactions need to be truncated from invalid transaction list.
+ * This is only populated for edits of type {@link State#TRUNCATE_INVALID_TX}
+ */
+ public long getTruncateInvalidTxTime() {
+ return truncateInvalidTxTime;
+ }
+
+ void setTruncateInvalidTxTime(long truncateInvalidTxTime) {
+ this.truncateInvalidTxTime = truncateInvalidTxTime;
+ }
+
+ /**
+ * Returns the parent write pointer for a checkpoint operation. This is only populated for edits of type
+ * {@link State#CHECKPOINT}
+ */
+ public long getParentWritePointer() {
+ return parentWritePointer;
+ }
+
+ void setParentWritePointer(long parentWritePointer) {
+ this.parentWritePointer = parentWritePointer;
+ }
+
+ /**
+ * Returns the checkpoint write pointers for the edit. This is only populated for edits of type
+ * {@link State#ABORTED}.
+ */
+ public long[] getCheckpointPointers() {
+ return checkpointPointers;
+ }
+
+ void setCheckpointPointers(long[] checkpointPointers) {
+ this.checkpointPointers = checkpointPointers;
+ }
+
+ /**
+ * Creates a new instance in the {@link State#INPROGRESS} state.
+ */
+ public static TransactionEdit createStarted(long writePointer, long visibilityUpperBound,
+ long expirationDate, TransactionType type) {
+ return new TransactionEdit(writePointer, visibilityUpperBound, State.INPROGRESS,
+ expirationDate, null, 0L, false, type, null, 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#COMMITTING} state.
+ */
+ public static TransactionEdit createCommitting(long writePointer, Set<ChangeId> changes) {
+ return new TransactionEdit(writePointer, 0L, State.COMMITTING, 0L, changes, 0L, false, null, null, 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#COMMITTED} state.
+ */
+ public static TransactionEdit createCommitted(long writePointer, Set<ChangeId> changes, long nextWritePointer,
+ boolean canCommit) {
+ return new TransactionEdit(writePointer, 0L, State.COMMITTED, 0L, changes, nextWritePointer, canCommit, null,
+ null, 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#ABORTED} state.
+ */
+ public static TransactionEdit createAborted(long writePointer, TransactionType type, long[] checkpointPointers) {
+ return new TransactionEdit(writePointer, 0L, State.ABORTED, 0L, null, 0L, false, type, null, 0L, 0L,
+ checkpointPointers);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#INVALID} state.
+ */
+ public static TransactionEdit createInvalid(long writePointer) {
+ return new TransactionEdit(writePointer, 0L, State.INVALID, 0L, null, 0L, false, null, null, 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#MOVE_WATERMARK} state.
+ */
+ public static TransactionEdit createMoveWatermark(long writePointer) {
+ return new TransactionEdit(writePointer, 0L, State.MOVE_WATERMARK, 0L, null, 0L, false, null, null, 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
+ */
+ public static TransactionEdit createTruncateInvalidTx(Set<Long> truncateInvalidTx) {
+ return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, truncateInvalidTx,
+ 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
+ */
+ public static TransactionEdit createTruncateInvalidTxBefore(long truncateInvalidTxTime) {
+ return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, null,
+ truncateInvalidTxTime, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#CHECKPOINT} state.
+ */
+ public static TransactionEdit createCheckpoint(long writePointer, long parentWritePointer) {
+ return new TransactionEdit(writePointer, 0L, State.CHECKPOINT, 0L, null, 0L, false, null, null, 0L,
+ parentWritePointer, null);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ TransactionEditCodecs.encode(this, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ TransactionEditCodecs.decode(this, in);
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TransactionEdit)) {
+ return false;
+ }
+
+ TransactionEdit that = (TransactionEdit) o;
+
+ return Objects.equal(this.writePointer, that.writePointer) &&
+ Objects.equal(this.visibilityUpperBound, that.visibilityUpperBound) &&
+ Objects.equal(this.commitPointer, that.commitPointer) &&
+ Objects.equal(this.expirationDate, that.expirationDate) &&
+ Objects.equal(this.state, that.state) &&
+ Objects.equal(this.changes, that.changes) &&
+ Objects.equal(this.canCommit, that.canCommit) &&
+ Objects.equal(this.type, that.type) &&
+ Objects.equal(this.truncateInvalidTx, that.truncateInvalidTx) &&
+ Objects.equal(this.truncateInvalidTxTime, that.truncateInvalidTxTime) &&
+ Objects.equal(this.parentWritePointer, that.parentWritePointer) &&
+ Arrays.equals(this.checkpointPointers, that.checkpointPointers);
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hashCode(writePointer, visibilityUpperBound, commitPointer, expirationDate, state, changes,
+ canCommit, type, parentWritePointer, checkpointPointers);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("writePointer", writePointer)
+ .add("visibilityUpperBound", visibilityUpperBound)
+ .add("commitPointer", commitPointer)
+ .add("expiration", expirationDate)
+ .add("state", state)
+ .add("changesSize", changes != null ? changes.size() : 0)
+ .add("canCommit", canCommit)
+ .add("type", type)
+ .add("truncateInvalidTx", truncateInvalidTx)
+ .add("truncateInvalidTxTime", truncateInvalidTxTime)
+ .add("parentWritePointer", parentWritePointer)
+ .add("checkpointPointers", checkpointPointers)
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java
new file mode 100644
index 0000000..387ad41
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Utilities to handle encoding and decoding of {@link TransactionEdit} entries, while maintaining compatibility
+ * with older versions of the serialized data.
+ */
+public class TransactionEditCodecs {
+
+ private static final TransactionEditCodec[] ALL_CODECS = {
+ new TransactionEditCodecV1(),
+ new TransactionEditCodecV2(),
+ new TransactionEditCodecV3(),
+ new TransactionEditCodecV4()
+ };
+
+ private static final SortedMap<Byte, TransactionEditCodec> CODECS = new TreeMap<>();
+ static {
+ for (TransactionEditCodec codec : ALL_CODECS) {
+ CODECS.put(codec.getVersion(), codec);
+ }
+ }
+
+ /**
+ * Deserializes the encoded data from the given input stream, setting the values as fields
+ * on the given {@code TransactionEdit} instances. This method expects first value in the
+ * {code DataInput} to be a byte representing the codec version used to serialize the instance.
+ *
+ * @param dest the transaction edit to populate with the deserialized values
+ * @param in the input stream containing the encoded data
+ * @throws IOException if an error occurs while deserializing from the input stream
+ */
+ public static void decode(TransactionEdit dest, DataInput in) throws IOException {
+ byte version = in.readByte();
+ TransactionEditCodec codec = CODECS.get(version);
+ if (codec == null) {
+ throw new IOException("TransactionEdit was serialized with an unknown codec version " + version +
+ ". Was it written with a newer version of Tephra?");
+ }
+ codec.decode(dest, in);
+ }
+
+ /**
+ * Serializes the given {@code TransactionEdit} instance with the latest available codec.
+ * This will first write out the version of the codec used to serialize the instance so that
+ * the correct codec can be used when calling {@link #decode(TransactionEdit, DataInput)}.
+ *
+ * @param src the transaction edit to serialize
+ * @param out the output stream to contain the data
+ * @throws IOException if an error occurs while serializing to the output stream
+ */
+ public static void encode(TransactionEdit src, DataOutput out) throws IOException {
+ TransactionEditCodec latestCodec = CODECS.get(CODECS.firstKey());
+ out.writeByte(latestCodec.getVersion());
+ latestCodec.encode(src, out);
+ }
+
+ /**
+ * Encodes the given transaction edit using a specific codec. Note that this is only exposed
+ * for use by tests.
+ */
+ @VisibleForTesting
+ static void encode(TransactionEdit src, DataOutput out, TransactionEditCodec codec) throws IOException {
+ out.writeByte(codec.getVersion());
+ codec.encode(src, out);
+ }
+
+ /**
+ * Defines the interface used for encoding and decoding {@link TransactionEdit} instances to and from
+ * binary representations.
+ */
+ interface TransactionEditCodec {
+ /**
+ * Reads the encoded values from the data input stream and sets the fields in the given {@code TransactionEdit}
+ * instance.
+ *
+ * @param dest the instance on which to set all the deserialized values
+ * @param in the input stream containing the serialized data
+ * @throws IOException if an error occurs while deserializing the data
+ */
+ void decode(TransactionEdit dest, DataInput in) throws IOException;
+
+ /**
+ * Writes all the field values from the {@code TransactionEdit} instance in serialized form to the data
+ * output stream.
+ *
+ * @param src the instance to serialize to the stream
+ * @param out the output stream to contain the data
+ * @throws IOException if an error occurs while serializing the data
+ */
+ void encode(TransactionEdit src, DataOutput out) throws IOException;
+
+ /**
+ * Returns the version number for this codec. Each codec should use a unique version number, with the newest
+ * codec having the lowest number.
+ */
+ byte getVersion();
+ }
+
+
+ // package-private for unit-test access
+ static class TransactionEditCodecV1 implements TransactionEditCodec {
+ @Override
+ public void decode(TransactionEdit dest, DataInput in) throws IOException {
+ dest.setWritePointer(in.readLong());
+ int stateIdx = in.readInt();
+ try {
+ dest.setState(TransactionEdit.State.values()[stateIdx]);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new IOException("State enum ordinal value is out of range: " + stateIdx);
+ }
+ dest.setExpiration(in.readLong());
+ dest.setCommitPointer(in.readLong());
+ dest.setCanCommit(in.readBoolean());
+ int changeSize = in.readInt();
+ Set<ChangeId> changes = Sets.newHashSet();
+ for (int i = 0; i < changeSize; i++) {
+ int currentLength = in.readInt();
+ byte[] currentBytes = new byte[currentLength];
+ in.readFully(currentBytes);
+ changes.add(new ChangeId(currentBytes));
+ }
+ dest.setChanges(changes);
+ // 1st version did not store this info. It is safe to set firstInProgress to 0, it may decrease performance until
+ // this tx is finished, but correctness will be preserved.
+ dest.setVisibilityUpperBound(0);
+ }
+
+ /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
+ * unit-tests only */
+ @Override
+ @Deprecated
+ public void encode(TransactionEdit src, DataOutput out) throws IOException {
+ out.writeLong(src.getWritePointer());
+ // use ordinal for predictable size, though this does not support evolution
+ out.writeInt(src.getState().ordinal());
+ out.writeLong(src.getExpiration());
+ out.writeLong(src.getCommitPointer());
+ out.writeBoolean(src.getCanCommit());
+ Set<ChangeId> changes = src.getChanges();
+ if (changes == null) {
+ out.writeInt(0);
+ } else {
+ out.writeInt(changes.size());
+ for (ChangeId c : changes) {
+ byte[] cKey = c.getKey();
+ out.writeInt(cKey.length);
+ out.write(cKey);
+ }
+ }
+ // NOTE: we didn't have visibilityUpperBound in V1, it was added in V2
+ // we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
+ // it was added in V3
+ }
+
+ @Override
+ public byte getVersion() {
+ return -1;
+ }
+ }
+
+ // package-private for unit-test access
+ static class TransactionEditCodecV2 extends TransactionEditCodecV1 implements TransactionEditCodec {
+ @Override
+ public void decode(TransactionEdit dest, DataInput in) throws IOException {
+ super.decode(dest, in);
+ dest.setVisibilityUpperBound(in.readLong());
+ }
+
+ /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
+ * unit-tests only */
+ @Override
+ public void encode(TransactionEdit src, DataOutput out) throws IOException {
+ super.encode(src, out);
+ out.writeLong(src.getVisibilityUpperBound());
+ // NOTE: we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
+ // it was added in V3
+ }
+
+ @Override
+ public byte getVersion() {
+ return -2;
+ }
+ }
+
+ // TODO: refactor to avoid duplicate code among different version of codecs
+ // package-private for unit-test access
+ static class TransactionEditCodecV3 extends TransactionEditCodecV2 implements TransactionEditCodec {
+ @Override
+ public void decode(TransactionEdit dest, DataInput in) throws IOException {
+ super.decode(dest, in);
+ int typeIdx = in.readInt();
+ // null transaction type is represented as -1
+ if (typeIdx < 0) {
+ dest.setType(null);
+ } else {
+ try {
+ dest.setType(TransactionType.values()[typeIdx]);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new IOException("Type enum ordinal value is out of range: " + typeIdx);
+ }
+ }
+
+ int truncateInvalidTxSize = in.readInt();
+ Set<Long> truncateInvalidTx = emptySet(dest.getTruncateInvalidTx());
+ for (int i = 0; i < truncateInvalidTxSize; i++) {
+ truncateInvalidTx.add(in.readLong());
+ }
+ dest.setTruncateInvalidTx(truncateInvalidTx);
+ dest.setTruncateInvalidTxTime(in.readLong());
+ }
+
+ private <T> Set<T> emptySet(Set<T> set) {
+ if (set == null) {
+ return Sets.newHashSet();
+ }
+ set.clear();
+ return set;
+ }
+
+ @Override
+ public void encode(TransactionEdit src, DataOutput out) throws IOException {
+ super.encode(src, out);
+ // null transaction type is represented as -1
+ if (src.getType() == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(src.getType().ordinal());
+ }
+
+ Set<Long> truncateInvalidTx = src.getTruncateInvalidTx();
+ if (truncateInvalidTx == null) {
+ out.writeInt(0);
+ } else {
+ out.writeInt(truncateInvalidTx.size());
+ for (long id : truncateInvalidTx) {
+ out.writeLong(id);
+ }
+ }
+ out.writeLong(src.getTruncateInvalidTxTime());
+ }
+
+ @Override
+ public byte getVersion() {
+ return -3;
+ }
+ }
+
+ static class TransactionEditCodecV4 extends TransactionEditCodecV3 {
+ @Override
+ public void decode(TransactionEdit dest, DataInput in) throws IOException {
+ super.decode(dest, in);
+ dest.setParentWritePointer(in.readLong());
+ int checkpointPointersLen = in.readInt();
+ if (checkpointPointersLen >= 0) {
+ long[] checkpointPointers = new long[checkpointPointersLen];
+ for (int i = 0; i < checkpointPointersLen; i++) {
+ checkpointPointers[i] = in.readLong();
+ }
+ dest.setCheckpointPointers(checkpointPointers);
+ }
+ }
+
+ @Override
+ public void encode(TransactionEdit src, DataOutput out) throws IOException {
+ super.encode(src, out);
+ out.writeLong(src.getParentWritePointer());
+ long[] checkpointPointers = src.getCheckpointPointers();
+ if (checkpointPointers == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(checkpointPointers.length);
+ for (int i = 0; i < checkpointPointers.length; i++) {
+ out.writeLong(checkpointPointers[i]);
+ }
+ }
+ }
+
+ @Override
+ public byte getVersion() {
+ return -4;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java
new file mode 100644
index 0000000..e453523
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Represents a log of transaction state changes.
+ */
+public interface TransactionLog {
+
+ String getName();
+
+ long getTimestamp();
+
+ void append(TransactionEdit edit) throws IOException;
+
+ void append(List<TransactionEdit> edits) throws IOException;
+
+ void close() throws IOException;
+
+ TransactionLogReader getReader() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java
new file mode 100644
index 0000000..51fda0b
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Represents a reader for {@link TransactionLog} instances.
+ */
+public interface TransactionLogReader extends Closeable {
+ /**
+ * Returns the next {@code TransactionEdit} from the log file, based on the current position, or {@code null}
+ * if the end of the file has been reached.
+ */
+ TransactionEdit next() throws IOException;
+
+ /**
+ * Populates {@code reuse} with the next {@code TransactionEdit}, based on the reader's current position in the
+ * log file.
+ * @param reuse The {@code TransactionEdit} instance to populate with the log entry data.
+ * @return The {@code TransactionEdit} instance, or {@code null} if the end of the file has been reached.
+ * @throws IOException If an error is encountered reading the log data.
+ */
+ TransactionEdit next(TransactionEdit reuse) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
new file mode 100644
index 0000000..14893ac
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Common interface for transaction log writers used by classes extending {@link AbstractTransactionLog}.
+ */
+public interface TransactionLogWriter extends Closeable {
+ /**
+ * Adds a new transaction entry to the log. Note that this does not guarantee that the entry has been flushed
+ * to persistent storage until {@link #sync()} has been called.
+ *
+ * @param entry The transaction edit to append.
+ * @throws IOException If an error occurs while writing the edit to storage.
+ */
+ void append(AbstractTransactionLog.Entry entry) throws IOException;
+
+ /**
+ * Makes an entry of number of transaction entries that will follow in that log in a single sync.
+ *
+ * @param count Number of transaction entries.
+ * @throws IOException If an error occurs while writing the count to storage.
+ */
+ void commitMarker(int count) throws IOException;
+
+ /**
+ * Syncs any pending transaction edits added through {@link #append(AbstractTransactionLog.Entry)},
+ * but not yet flushed to durable storage.
+ *
+ * @throws IOException If an error occurs while flushing the outstanding edits.
+ */
+ void sync() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
new file mode 100644
index 0000000..ccf7374
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Represents an in-memory snapshot of the full transaction state.
+ */
+public class TransactionSnapshot implements TransactionVisibilityState {
+ private long timestamp;
+ private long readPointer;
+ private long writePointer;
+ private Collection<Long> invalid;
+ private NavigableMap<Long, TransactionManager.InProgressTx> inProgress;
+ private Map<Long, Set<ChangeId>> committingChangeSets;
+ private Map<Long, Set<ChangeId>> committedChangeSets;
+
+ public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
+ NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
+ Map<Long, Set<ChangeId>> committing, Map<Long, Set<ChangeId>> committed) {
+ this(timestamp, readPointer, writePointer, invalid, inProgress);
+ this.committingChangeSets = committing;
+ this.committedChangeSets = committed;
+ }
+
+ public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
+ NavigableMap<Long, TransactionManager.InProgressTx> inProgress) {
+ this.timestamp = timestamp;
+ this.readPointer = readPointer;
+ this.writePointer = writePointer;
+ this.invalid = invalid;
+ this.inProgress = inProgress;
+ this.committingChangeSets = Collections.emptyMap();
+ this.committedChangeSets = Collections.emptyMap();
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public long getReadPointer() {
+ return readPointer;
+ }
+
+ @Override
+ public long getWritePointer() {
+ return writePointer;
+ }
+
+ @Override
+ public Collection<Long> getInvalid() {
+ return invalid;
+ }
+
+ @Override
+ public NavigableMap<Long, TransactionManager.InProgressTx> getInProgress() {
+ return inProgress;
+ }
+
+ @Override
+ public long getVisibilityUpperBound() {
+ // the readPointer of the oldest in-progress tx is the oldest in use
+ // todo: potential problem with not moving visibility upper bound for the whole duration of long-running tx
+ Map.Entry<Long, TransactionManager.InProgressTx> firstInProgress = inProgress.firstEntry();
+ if (firstInProgress == null) {
+ // using readPointer as smallest visible when non txs are there
+ return readPointer;
+ }
+ return firstInProgress.getValue().getVisibilityUpperBound();
+ }
+
+ /**
+ * Returns a map of transaction write pointer to sets of changed row keys for transactions that had called
+ * {@code InMemoryTransactionManager.canCommit(Transaction, Collection)} but not yet called
+ * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot.
+ *
+ * @return a map of transaction write pointer to set of changed row keys.
+ */
+ public Map<Long, Set<ChangeId>> getCommittingChangeSets() {
+ return committingChangeSets;
+ }
+
+ /**
+ * Returns a map of transaction write pointer to set of changed row keys for transaction that had successfully called
+ * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot.
+ *
+ * @return a map of transaction write pointer to set of changed row keys.
+ */
+ public Map<Long, Set<ChangeId>> getCommittedChangeSets() {
+ return committedChangeSets;
+ }
+
+ /**
+ * Checks that this instance matches another {@code TransactionSnapshot} instance. Note that the equality check
+ * ignores the snapshot timestamp value, but includes all other properties.
+ *
+ * @param obj the other instance to check for equality.
+ * @return {@code true} if the instances are equal, {@code false} if not.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof TransactionSnapshot)) {
+ return false;
+ }
+ TransactionSnapshot other = (TransactionSnapshot) obj;
+ return readPointer == other.readPointer &&
+ writePointer == other.writePointer &&
+ invalid.equals(other.invalid) &&
+ inProgress.equals(other.inProgress) &&
+ committingChangeSets.equals(other.committingChangeSets) &&
+ committedChangeSets.equals(other.committedChangeSets);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("timestamp", timestamp)
+ .add("readPointer", readPointer)
+ .add("writePointer", writePointer)
+ .add("invalidSize", invalid.size())
+ .add("inProgressSize", inProgress.size())
+ .add("committingSize", committingChangeSets.size())
+ .add("committedSize", committedChangeSets.size())
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(readPointer, writePointer, invalid, inProgress, committingChangeSets, committedChangeSets);
+ }
+
+ /**
+ * Creates a new {@code TransactionSnapshot} instance with copies of all of the individual collections.
+ * @param readPointer current transaction read pointer
+ * @param writePointer current transaction write pointer
+ * @param invalid current list of invalid write pointers
+ * @param inProgress current map of in-progress write pointers to expiration timestamps
+ * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not
+ * yet committed
+ * @param committed current map of write pointers to change sets which have committed
+ * @return a new {@code TransactionSnapshot} instance
+ */
+ public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer,
+ long writePointer, Collection<Long> invalid,
+ NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
+ Map<Long, Set<ChangeId>> committing,
+ NavigableMap<Long, Set<ChangeId>> committed) {
+ // copy invalid IDs
+ Collection<Long> invalidCopy = Lists.newArrayList(invalid);
+ // copy in-progress IDs and expirations
+ NavigableMap<Long, TransactionManager.InProgressTx> inProgressCopy = Maps.newTreeMap(inProgress);
+
+ // for committing and committed maps, we need to copy each individual Set as well to prevent modification
+ Map<Long, Set<ChangeId>> committingCopy = Maps.newHashMap();
+ for (Map.Entry<Long, Set<ChangeId>> entry : committing.entrySet()) {
+ committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
+ }
+
+ NavigableMap<Long, Set<ChangeId>> committedCopy = new TreeMap<>();
+ for (Map.Entry<Long, Set<ChangeId>> entry : committed.entrySet()) {
+ committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
+ }
+
+ return new TransactionSnapshot(snapshotTime, readPointer, writePointer,
+ invalidCopy, inProgressCopy, committingCopy, committedCopy);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java
new file mode 100644
index 0000000..f7edc8b
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.util.concurrent.Service;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * Defines the common contract for persisting transaction state changes.
+ */
+public interface TransactionStateStorage extends Service {
+
+ /**
+ * Persists a snapshot of transaction state to an output stream.
+ */
+ public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException;
+
+ /**
+ * Persists a snapshot of transaction state.
+ */
+ public void writeSnapshot(TransactionSnapshot snapshot) throws IOException;
+
+ /**
+ * Returns the most recent snapshot that has been successfully written. Note that this may return {@code null}
+ * if no completed snapshot files are found.
+ */
+ public TransactionSnapshot getLatestSnapshot() throws IOException;
+
+ /**
+ * Returns the most recent transaction visibility state that has been successfully written.
+ * Note that this may return {@code null} if no completed snapshot files are found.
+ * @return {@link TransactionVisibilityState}
+ */
+ public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException;
+
+ /**
+ * Removes any snapshots prior to the {@code numberToKeep} most recent.
+ *
+ * @param numberToKeep The number of most recent snapshots to keep.
+ * @throws IOException If an error occurs while deleting old snapshots.
+ * @return The timestamp of the oldest snapshot kept.
+ */
+ public long deleteOldSnapshots(int numberToKeep) throws IOException;
+
+ /**
+ * Returns the (non-qualified) names of available snapshots.
+ */
+ public List<String> listSnapshots() throws IOException;
+
+ /**
+ * Returns all {@link TransactionLog}s with a timestamp greater than or equal to the given timestamp. Note that
+ * the returned list is guaranteed to be sorted in ascending timestamp order.
+ */
+ public List<TransactionLog> getLogsSince(long timestamp) throws IOException;
+
+ /**
+ * Creates a new {@link TransactionLog}.
+ */
+ public TransactionLog createLog(long timestamp) throws IOException;
+
+ /**
+ * Returns the (non-qualified) names of available logs.
+ */
+ public List<String> listLogs() throws IOException;
+
+ /**
+ * Removes any transaction logs with a timestamp older than the given value. Logs must be removed based on timestamp
+ * to ensure we can fully recover state based on a given snapshot.
+ * @param timestamp The timestamp to delete up to. Logs with a timestamp less than this value will be removed.
+ * @throws IOException If an error occurs while removing logs.
+ */
+ public void deleteLogsOlderThan(long timestamp) throws IOException;
+
+ /**
+ * Create the directories required for the transaction state stage.
+ * @throws IOException If an error occurred during the creation of required directories for transaction state storage.
+ */
+ public void setupStorage() throws IOException;
+
+ /**
+ * Returns a string representation of the location used for persistence.
+ */
+ public String getLocation();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java
new file mode 100644
index 0000000..cf845af
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.tephra.TransactionManager;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+
+/**
+ * Transaction Visibility state contains information required by TransactionProcessor CoProcessor
+ * to determine cell visibility.
+ */
+public interface TransactionVisibilityState {
+
+ /**
+ * Returns the timestamp from when this snapshot was created.
+ */
+ long getTimestamp();
+
+ /**
+ * Returns the read pointer at the time of the snapshot.
+ */
+ long getReadPointer();
+
+ /**
+ * Returns the next write pointer at the time of the snapshot.
+ */
+ long getWritePointer();
+
+ /**
+ * Returns the list of invalid write pointers at the time of the snapshot.
+ */
+ Collection<Long> getInvalid();
+
+ /**
+ * Returns the map of write pointers to in-progress transactions at the time of the snapshot.
+ */
+ NavigableMap<Long, TransactionManager.InProgressTx> getInProgress();
+
+ /**
+ * @return transaction id {@code X} such that any of the transactions newer than {@code X} might be invisible to
+ * some of the currently in-progress transactions or to those that will be started <p>
+ * NOTE: the returned tx id can be invalid.
+ */
+ long getVisibilityUpperBound();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java b/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java
new file mode 100644
index 0000000..01decb0
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains interfaces and implementations for persisting transaction state.
+ */
+package org.apache.tephra.persist;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java b/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java
new file mode 100644
index 0000000..242a6fe
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.rpc;
+
+/**
+ * Defines lifecycle interface for all rpc handlers.
+ */
+public interface RPCServiceHandler {
+
+ void init() throws Exception;
+
+ void destroy() throws Exception;
+}