You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:43 UTC
[43/56] [abbrv] [partial] incubator-tephra git commit: Rename package
to org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java
deleted file mode 100644
index a462566..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.persist;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.lang.reflect.Method;
-
-/**
- * Utility for handling HDFS file lease recovery. This is a copy-n-paste fork of
- * {@link org.apache.hadoop.hbase.util.FSHDFSUtils} from the latest HBase 0.94 version (as of 0.94.12),
- * which contains some additional fixes not present in our current HBase dependency version --
- * mainly checking the return value of the {@code DistributedFileSystem.recoverLease()} call to verify that
- * recovery succeeded.
- */
-public class HDFSUtil {
- private static final Logger LOG = LoggerFactory.getLogger(HDFSUtil.class);
- /**
- * Recover the lease from HDFS, retrying multiple times.
- */
- public void recoverFileLease(final FileSystem fs, final Path p,
- Configuration conf)
- throws IOException {
- // lease recovery not needed for local file system case.
- if (!(fs instanceof DistributedFileSystem)) {
- return;
- }
- recoverDFSFileLease((DistributedFileSystem) fs, p, conf);
- }
-
- /*
- * Run the dfs recover lease. recoverLease is asynchronous. It returns:
- * -false when it starts the lease recovery (i.e. lease recovery not *yet* done)
- * - true when the lease recovery has succeeded or the file is closed.
- * But, we have to be careful. Each time we call recoverLease, it starts the recover lease
- * process over from the beginning. We could put ourselves in a situation where we are
- * doing nothing but starting a recovery, interrupting it to start again, and so on.
- * The findings over in HBASE-8354 have it that the namenode will try to recover the lease
- * on the file's primary node. If all is well, it should return near immediately. But,
- * as is common, it is the very primary node that has crashed and so the namenode will be
- * stuck waiting on a socket timeout before it will ask another datanode to start the
- * recovery. It does not help if we call recoverLease in the meantime and in particular,
- * subsequent to the socket timeout, a recoverLease invocation will cause us to start
- * over from square one (possibly waiting on socket timeout against primary node). So,
- * in the below, we do the following:
- * 1. Call recoverLease.
- * 2. If it returns true, break.
- * 3. If it returns false, wait a few seconds and then call it again.
- * 4. If it returns true, break.
- * 5. If it returns false, wait for what we think the datanode socket timeout is
- * (configurable) and then try again.
- * 6. If it returns true, break.
- * 7. If it returns false, repeat starting at step 5. above.
- *
- * If HDFS-4525 is available, call it every second and we might be able to exit early.
- */
- boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
- final Configuration conf)
- throws IOException {
- LOG.info("Recovering lease on dfs file " + p);
- long startWaiting = System.currentTimeMillis();
- // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
- // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
- // beyond that limit 'to be safe'.
- long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
- // This setting should be what the cluster dfs heartbeat is set to.
- long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 3000);
- // This should be set to how long it'll take for us to timeout against primary datanode if it
- // is dead. We set it to 61 seconds, 1 second than the default READ_TIMEOUT in HDFS, the
- // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY.
- long subsequentPause = conf.getInt("hbase.lease.recovery.dfs.timeout", 61 * 1000);
-
- Method isFileClosedMeth = null;
- // whether we need to look for isFileClosed method
- boolean findIsFileClosedMeth = true;
- boolean recovered = false;
- // We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
- for (int nbAttempt = 0; !recovered; nbAttempt++) {
- recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
- if (recovered || checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
- break;
- }
- try {
- // On the first time through wait the short 'firstPause'.
- if (nbAttempt == 0) {
- Thread.sleep(firstPause);
- } else {
- // Cycle here until subsequentPause elapses. While spinning, check isFileClosed if
- // available (should be in hadoop 2.0.5... not in hadoop 1 though.
- long localStartWaiting = System.currentTimeMillis();
- while ((System.currentTimeMillis() - localStartWaiting) <
- subsequentPause) {
- Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
- if (findIsFileClosedMeth) {
- try {
- isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
- new Class[]{ Path.class });
- } catch (NoSuchMethodException nsme) {
- LOG.debug("isFileClosed not available");
- } finally {
- findIsFileClosedMeth = false;
- }
- }
- if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
- recovered = true;
- break;
- }
- }
- }
- } catch (InterruptedException ie) {
- InterruptedIOException iioe = new InterruptedIOException();
- iioe.initCause(ie);
- throw iioe;
- }
- }
- return recovered;
- }
-
- boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
- final int nbAttempt, final Path p, final long startWaiting) {
- if (recoveryTimeout < System.currentTimeMillis()) {
- LOG.warn("Cannot recoverLease after trying for " +
- conf.getInt("hbase.lease.recovery.timeout", 900000) +
- "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
- getLogMessageDetail(nbAttempt, p, startWaiting));
- return true;
- }
- return false;
- }
-
- /**
- * Try to recover the lease.
- * @param dfs The filesystem instance.
- * @param nbAttempt Count number of this attempt.
- * @param p Path of the file to recover.
- * @param startWaiting Timestamp of when we started attempting to recover the file lease.
- * @return True if dfs#recoverLease came by true.
- * @throws java.io.FileNotFoundException
- */
- boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p,
- final long startWaiting)
- throws FileNotFoundException {
- boolean recovered = false;
- try {
- recovered = dfs.recoverLease(p);
- LOG.info("recoverLease=" + recovered + ", " +
- getLogMessageDetail(nbAttempt, p, startWaiting));
- } catch (IOException e) {
- if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
- // This exception comes out instead of FNFE, fix it
- throw new FileNotFoundException("The given file wasn't found at " + p);
- } else if (e instanceof FileNotFoundException) {
- throw (FileNotFoundException) e;
- }
- LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
- }
- return recovered;
- }
-
- /**
- * @param nbAttempt Attempt number for the lease recovery.
- * @param p Path of the file to recover.
- * @param startWaiting Timestamp of when we started attempting to recover the file lease.
- * @return Detail to append to any log message around lease recovering.
- */
- private String getLogMessageDetail(final int nbAttempt, final Path p, final long startWaiting) {
- return "attempt=" + nbAttempt + " on file=" + p + " after " +
- (System.currentTimeMillis() - startWaiting) + "ms";
- }
-
- /**
- * Call HDFS-4525 isFileClosed if it is available.
- * @param dfs Filesystem instance to use.
- * @param m Method instance to call.
- * @param p Path of the file to check is closed.
- * @return True if file is closed.
- */
- private boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) {
- try {
- return (Boolean) m.invoke(dfs, p);
- } catch (SecurityException e) {
- LOG.warn("No access", e);
- } catch (Exception e) {
- LOG.warn("Failed invocation for " + p.toString(), e);
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
deleted file mode 100644
index 590aaa5..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.metrics.MetricsCollector;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-/**
- * Reads and writes transaction logs against files in the local filesystem.
- */
-public class LocalFileTransactionLog extends AbstractTransactionLog {
- private final File logFile;
-
- /**
- * Creates a new transaction log using the given file instance.
- * @param logFile The log file to use.
- */
- public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) {
- super(timestamp, metricsCollector);
- this.logFile = logFile;
- }
-
- @Override
- public String getName() {
- return logFile.getAbsolutePath();
- }
-
- @Override
- protected TransactionLogWriter createWriter() throws IOException {
- return new LogWriter(logFile);
- }
-
- @Override
- public TransactionLogReader getReader() throws IOException {
- return new LogReader(logFile);
- }
-
- private static final class LogWriter implements TransactionLogWriter {
- private final FileOutputStream fos;
- private final DataOutputStream out;
-
- public LogWriter(File logFile) throws IOException {
- this.fos = new FileOutputStream(logFile);
- this.out = new DataOutputStream(new BufferedOutputStream(fos, LocalFileTransactionStateStorage.BUFFER_SIZE));
- }
-
- @Override
- public void append(Entry entry) throws IOException {
- entry.write(out);
- }
-
- @Override
- public void commitMarker(int count) throws IOException {
- // skip for local file
- }
-
- @Override
- public void sync() throws IOException {
- out.flush();
- }
-
- @Override
- public void close() throws IOException {
- out.flush();
- out.close();
- fos.close();
- }
- }
-
- private static final class LogReader implements TransactionLogReader {
- private final FileInputStream fin;
- private final DataInputStream in;
- private Entry reuseEntry = new Entry();
-
- public LogReader(File logFile) throws IOException {
- this.fin = new FileInputStream(logFile);
- this.in = new DataInputStream(new BufferedInputStream(fin, LocalFileTransactionStateStorage.BUFFER_SIZE));
- }
-
- @Override
- public TransactionEdit next() throws IOException {
- Entry entry = new Entry();
- try {
- entry.readFields(in);
- } catch (EOFException eofe) {
- // signal end of file by returning null
- return null;
- }
- return entry.getEdit();
- }
-
- @Override
- public TransactionEdit next(TransactionEdit reuse) throws IOException {
- try {
- reuseEntry.getKey().readFields(in);
- reuse.readFields(in);
- } catch (EOFException eofe) {
- // signal end of file by returning null
- return null;
- }
- return reuse;
- }
-
- @Override
- public void close() throws IOException {
- in.close();
- fin.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
deleted file mode 100644
index 4c3539d..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
-import com.google.common.io.Files;
-import com.google.common.primitives.Longs;
-import com.google.inject.Inject;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * Persists transaction snapshots and write-ahead logs to files on the local filesystem.
- */
-public class LocalFileTransactionStateStorage extends AbstractTransactionStateStorage {
- private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.";
- private static final String SNAPSHOT_FILE_PREFIX = "snapshot.";
- private static final String LOG_FILE_PREFIX = "txlog.";
- private static final Logger LOG = LoggerFactory.getLogger(LocalFileTransactionStateStorage.class);
- static final int BUFFER_SIZE = 16384;
-
- private static final FilenameFilter SNAPSHOT_FILE_FILTER = new FilenameFilter() {
- @Override
- public boolean accept(File file, String s) {
- return s.startsWith(SNAPSHOT_FILE_PREFIX);
- }
- };
-
- private final String configuredSnapshotDir;
- private final MetricsCollector metricsCollector;
- private File snapshotDir;
-
- @Inject
- public LocalFileTransactionStateStorage(Configuration conf, SnapshotCodecProvider codecProvider,
- MetricsCollector metricsCollector) {
- super(codecProvider);
- this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR);
- this.metricsCollector = metricsCollector;
- }
-
- @Override
- protected void startUp() throws Exception {
- Preconditions.checkState(configuredSnapshotDir != null,
- "Snapshot directory is not configured. Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR +
- " in configuration.");
- snapshotDir = new File(configuredSnapshotDir);
- }
-
- @Override
- protected void shutDown() throws Exception {
- // nothing to do
- }
-
- @Override
- public String getLocation() {
- return snapshotDir.getAbsolutePath();
- }
-
- @Override
- public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
- // save the snapshot to a temporary file
- File snapshotTmpFile = new File(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
- LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile);
- OutputStream out = Files.newOutputStreamSupplier(snapshotTmpFile).getOutput();
- boolean threw = true;
- try {
- codecProvider.encode(out, snapshot);
- threw = false;
- } finally {
- Closeables.close(out, threw);
- }
-
- // move the temporary file into place with the correct filename
- File finalFile = new File(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
- if (!snapshotTmpFile.renameTo(finalFile)) {
- throw new IOException("Failed renaming temporary snapshot file " + snapshotTmpFile.getName() + " to " +
- finalFile.getName());
- }
-
- LOG.debug("Completed snapshot to file {}", finalFile);
- }
-
- @Override
- public TransactionSnapshot getLatestSnapshot() throws IOException {
- InputStream is = getLatestSnapshotInputStream();
- if (is == null) {
- return null;
- }
- try {
- return readSnapshotFile(is);
- } finally {
- is.close();
- }
- }
-
- @Override
- public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
- InputStream is = getLatestSnapshotInputStream();
- if (is == null) {
- return null;
- }
- try {
- return codecProvider.decodeTransactionVisibilityState(is);
- } finally {
- is.close();
- }
- }
-
- private InputStream getLatestSnapshotInputStream() throws IOException {
- File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
- TimestampedFilename mostRecent = null;
- for (File file : snapshotFiles) {
- TimestampedFilename tsFile = new TimestampedFilename(file);
- if (mostRecent == null || tsFile.compareTo(mostRecent) > 0) {
- mostRecent = tsFile;
- }
- }
-
- if (mostRecent == null) {
- LOG.info("No snapshot files found in {}", snapshotDir.getAbsolutePath());
- return null;
- }
-
- return new FileInputStream(mostRecent.getFile());
- }
-
- private TransactionSnapshot readSnapshotFile(InputStream is) throws IOException {
- return codecProvider.decode(is);
- }
-
- @Override
- public long deleteOldSnapshots(int numberToKeep) throws IOException {
- File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
- if (snapshotFiles.length == 0) {
- return -1;
- }
- TimestampedFilename[] snapshotFilenames = new TimestampedFilename[snapshotFiles.length];
- for (int i = 0; i < snapshotFiles.length; i++) {
- snapshotFilenames[i] = new TimestampedFilename(snapshotFiles[i]);
- }
- Arrays.sort(snapshotFilenames, Collections.reverseOrder());
- if (snapshotFilenames.length <= numberToKeep) {
- // nothing to delete, just return the oldest timestamp
- return snapshotFilenames[snapshotFilenames.length - 1].getTimestamp();
- }
- int toRemoveCount = snapshotFilenames.length - numberToKeep;
- TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount];
- System.arraycopy(snapshotFilenames, numberToKeep, toRemove, 0, toRemoveCount);
- int removedCnt = 0;
- for (int i = 0; i < toRemove.length; i++) {
- File currentFile = toRemove[i].getFile();
- LOG.debug("Removing old snapshot file {}", currentFile.getAbsolutePath());
- if (!toRemove[i].getFile().delete()) {
- LOG.error("Failed deleting snapshot file {}", currentFile.getAbsolutePath());
- } else {
- removedCnt++;
- }
- }
- long oldestTimestamp = snapshotFilenames[numberToKeep - 1].getTimestamp();
- LOG.info("Removed {} out of {} expected snapshot files older than {}", removedCnt, toRemoveCount, oldestTimestamp);
- return oldestTimestamp;
- }
-
- @Override
- public List<String> listSnapshots() throws IOException {
- File[] snapshots = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
- return Lists.transform(Arrays.asList(snapshots), new Function<File, String>() {
- @Nullable
- @Override
- public String apply(@Nullable File input) {
- return input.getName();
- }
- });
- }
-
- @Override
- public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
- File[] logFiles = snapshotDir.listFiles(new LogFileFilter(timestamp, Long.MAX_VALUE));
- TimestampedFilename[] timestampedFiles = new TimestampedFilename[logFiles.length];
- for (int i = 0; i < logFiles.length; i++) {
- timestampedFiles[i] = new TimestampedFilename(logFiles[i]);
- }
- // logs need to be processed in ascending order
- Arrays.sort(timestampedFiles);
- return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() {
- @Nullable
- @Override
- public TransactionLog apply(@Nullable TimestampedFilename input) {
- return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector);
- }
- });
- }
-
- @Override
- public TransactionLog createLog(long timestamp) throws IOException {
- File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp);
- LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath());
- return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector);
- }
-
- @Override
- public void deleteLogsOlderThan(long timestamp) throws IOException {
- File[] logFiles = snapshotDir.listFiles(new LogFileFilter(0, timestamp));
- int removedCnt = 0;
- for (File file : logFiles) {
- LOG.debug("Removing old transaction log {}", file.getPath());
- if (file.delete()) {
- removedCnt++;
- } else {
- LOG.warn("Failed to remove log file {}", file.getAbsolutePath());
- }
- }
- LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp);
- }
-
- @Override
- public void setupStorage() throws IOException {
- // create the directory if it doesn't exist
- if (!snapshotDir.exists()) {
- if (!snapshotDir.mkdirs()) {
- throw new IOException("Failed to create directory " + configuredSnapshotDir +
- " for transaction snapshot storage");
- }
- } else {
- Preconditions.checkState(snapshotDir.isDirectory(),
- "Configured snapshot directory " + configuredSnapshotDir + " is not a directory!");
- Preconditions.checkState(snapshotDir.canWrite(), "Configured snapshot directory " +
- configuredSnapshotDir + " exists but is not writable!");
- }
- }
-
- @Override
- public List<String> listLogs() throws IOException {
- File[] logs = snapshotDir.listFiles(new LogFileFilter(0, Long.MAX_VALUE));
- return Lists.transform(Arrays.asList(logs), new Function<File, String>() {
- @Nullable
- @Override
- public String apply(@Nullable File input) {
- return input.getName();
- }
- });
- }
-
- private static class LogFileFilter implements FilenameFilter {
- private final long startTime;
- private final long endTime;
-
- public LogFileFilter(long startTime, long endTime) {
- this.startTime = startTime;
- this.endTime = endTime;
- }
-
- @Override
- public boolean accept(File file, String s) {
- if (s.startsWith(LOG_FILE_PREFIX)) {
- String[] parts = s.split("\\.");
- if (parts.length == 2) {
- try {
- long fileTime = Long.parseLong(parts[1]);
- return fileTime >= startTime && fileTime < endTime;
- } catch (NumberFormatException ignored) {
- LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", s);
- }
- }
- }
- return false;
- }
- }
-
- /**
- * Represents a filename composed of a prefix and a ".timestamp" suffix. This is useful for manipulating both
- * snapshot and transaction log filenames.
- */
- private static class TimestampedFilename implements Comparable<TimestampedFilename> {
- private File file;
- private String prefix;
- private long timestamp;
-
- public TimestampedFilename(File file) {
- this.file = file;
- String[] parts = file.getName().split("\\.");
- if (parts.length != 2) {
- throw new IllegalArgumentException("Filename " + file.getName() +
- " did not match the expected pattern prefix.timestamp");
- }
- prefix = parts[0];
- timestamp = Long.parseLong(parts[1]);
- }
-
- public File getFile() {
- return file;
- }
-
- public String getPrefix() {
- return prefix;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public int compareTo(TimestampedFilename other) {
- int res = prefix.compareTo(other.getPrefix());
- if (res == 0) {
- res = Longs.compare(timestamp, other.getTimestamp());
- }
- return res;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java
deleted file mode 100644
index 3b6d0b7..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.snapshot.SnapshotCodec;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.inject.Inject;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Minimal {@link TransactionStateStorage} implementation that does nothing, i.e. does not maintain any actual state.
- */
-public class NoOpTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
-
- private final SnapshotCodec codec;
-
- @Inject
- public NoOpTransactionStateStorage(SnapshotCodecProvider codecProvider) {
- codec = codecProvider;
- }
-
- @Override
- protected void startUp() throws Exception {
- }
-
- @Override
- protected void shutDown() throws Exception {
- }
-
- @Override
- public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
- codec.encode(out, snapshot);
- }
-
- @Override
- public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
- }
-
- @Override
- public TransactionSnapshot getLatestSnapshot() throws IOException {
- return null;
- }
-
- @Override
- public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
- return null;
- }
-
- @Override
- public long deleteOldSnapshots(int numberToKeep) throws IOException {
- return 0;
- }
-
- @Override
- public List<String> listSnapshots() throws IOException {
- return new ArrayList<>(0);
- }
-
- @Override
- public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
- return new ArrayList<>(0);
- }
-
- @Override
- public TransactionLog createLog(long timestamp) throws IOException {
- return new NoOpTransactionLog();
- }
-
- @Override
- public void deleteLogsOlderThan(long timestamp) throws IOException {
- }
-
- @Override
- public void setupStorage() throws IOException {
- }
-
- @Override
- public List<String> listLogs() throws IOException {
- return new ArrayList<>(0);
- }
-
- @Override
- public String getLocation() {
- return "no-op";
- }
-
- private static class NoOpTransactionLog implements TransactionLog {
- private long timestamp = System.currentTimeMillis();
-
- @Override
- public String getName() {
- return "no-op";
- }
-
- @Override
- public long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public void append(TransactionEdit edit) throws IOException {
- }
-
- @Override
- public void append(List<TransactionEdit> edits) throws IOException {
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public TransactionLogReader getReader() {
- return new TransactionLogReader() {
- @Override
- public TransactionEdit next() {
- return null;
- }
-
- @Override
- public TransactionEdit next(TransactionEdit reuse) {
- return null;
- }
-
- @Override
- public void close() {
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
deleted file mode 100644
index 405bbfd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import com.google.common.base.Objects;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * Represents a transaction state change in the {@link TransactionLog}.
- */
-public class TransactionEdit implements Writable {
-
- /**
- * The possible state changes for a transaction.
- */
- public enum State {
- INPROGRESS, COMMITTING, COMMITTED, INVALID, ABORTED, MOVE_WATERMARK, TRUNCATE_INVALID_TX, CHECKPOINT
- }
-
- private long writePointer;
-
- /**
- * stores the value of visibility upper bound
- * (see {@link TransactionManager.InProgressTx#getVisibilityUpperBound()})
- * for edit of {@link State#INPROGRESS} only
- */
- private long visibilityUpperBound;
- private long commitPointer;
- private long expirationDate;
- private State state;
- private Set<ChangeId> changes;
- /** Whether or not the COMMITTED change should be fully committed. */
- private boolean canCommit;
- private TransactionType type;
- private Set<Long> truncateInvalidTx;
- private long truncateInvalidTxTime;
- private long parentWritePointer;
- private long[] checkpointPointers;
-
- // for Writable
- public TransactionEdit() {
- this.changes = Sets.newHashSet();
- this.truncateInvalidTx = Sets.newHashSet();
- }
-
- // package private for testing
- TransactionEdit(long writePointer, long visibilityUpperBound, State state, long expirationDate,
- Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type,
- Set<Long> truncateInvalidTx, long truncateInvalidTxTime, long parentWritePointer,
- long[] checkpointPointers) {
- this.writePointer = writePointer;
- this.visibilityUpperBound = visibilityUpperBound;
- this.state = state;
- this.expirationDate = expirationDate;
- this.changes = changes != null ? changes : Collections.<ChangeId>emptySet();
- this.commitPointer = commitPointer;
- this.canCommit = canCommit;
- this.type = type;
- this.truncateInvalidTx = truncateInvalidTx != null ? truncateInvalidTx : Collections.<Long>emptySet();
- this.truncateInvalidTxTime = truncateInvalidTxTime;
- this.parentWritePointer = parentWritePointer;
- this.checkpointPointers = checkpointPointers;
- }
-
- /**
- * Returns the transaction write pointer assigned for the state change.
- */
- public long getWritePointer() {
- return writePointer;
- }
-
- void setWritePointer(long writePointer) {
- this.writePointer = writePointer;
- }
-
- public long getVisibilityUpperBound() {
- return visibilityUpperBound;
- }
-
- void setVisibilityUpperBound(long visibilityUpperBound) {
- this.visibilityUpperBound = visibilityUpperBound;
- }
-
- /**
- * Returns the type of state change represented.
- */
- public State getState() {
- return state;
- }
-
- void setState(State state) {
- this.state = state;
- }
-
- /**
- * Returns any expiration timestamp (in milliseconds) associated with the state change. This should only
- * be populated for changes of type {@link State#INPROGRESS}.
- */
- public long getExpiration() {
- return expirationDate;
- }
-
- void setExpiration(long expirationDate) {
- this.expirationDate = expirationDate;
- }
-
- /**
- * @return the set of changed row keys associated with the state change. This is only populated for edits
- * of type {@link State#COMMITTING} or {@link State#COMMITTED}.
- */
- public Set<ChangeId> getChanges() {
- return changes;
- }
-
- void setChanges(Set<ChangeId> changes) {
- this.changes = changes;
- }
-
- /**
- * Returns the write pointer used to commit the row key change set. This is only populated for edits of type
- * {@link State#COMMITTED}.
- */
- public long getCommitPointer() {
- return commitPointer;
- }
-
- void setCommitPointer(long commitPointer) {
- this.commitPointer = commitPointer;
- }
-
- /**
- * Returns whether or not the transaction should be moved to the committed set. This is only populated for edits
- * of type {@link State#COMMITTED}.
- */
- public boolean getCanCommit() {
- return canCommit;
- }
-
- void setCanCommit(boolean canCommit) {
- this.canCommit = canCommit;
- }
-
- /**
- * Returns the transaction type. This is only populated for edits of type {@link State#INPROGRESS} or
- * {@link State#ABORTED}.
- */
- public TransactionType getType() {
- return type;
- }
-
- void setType(TransactionType type) {
- this.type = type;
- }
-
- /**
- * Returns the transaction ids to be removed from invalid transaction list. This is only populated for
- * edits of type {@link State#TRUNCATE_INVALID_TX}
- */
- public Set<Long> getTruncateInvalidTx() {
- return truncateInvalidTx;
- }
-
- void setTruncateInvalidTx(Set<Long> truncateInvalidTx) {
- this.truncateInvalidTx = truncateInvalidTx;
- }
-
- /**
- * Returns the time until which the invalid transactions need to be truncated from invalid transaction list.
- * This is only populated for edits of type {@link State#TRUNCATE_INVALID_TX}
- */
- public long getTruncateInvalidTxTime() {
- return truncateInvalidTxTime;
- }
-
- void setTruncateInvalidTxTime(long truncateInvalidTxTime) {
- this.truncateInvalidTxTime = truncateInvalidTxTime;
- }
-
- /**
- * Returns the parent write pointer for a checkpoint operation. This is only populated for edits of type
- * {@link State#CHECKPOINT}
- */
- public long getParentWritePointer() {
- return parentWritePointer;
- }
-
- void setParentWritePointer(long parentWritePointer) {
- this.parentWritePointer = parentWritePointer;
- }
-
- /**
- * Returns the checkpoint write pointers for the edit. This is only populated for edits of type
- * {@link State#ABORTED}.
- */
- public long[] getCheckpointPointers() {
- return checkpointPointers;
- }
-
- void setCheckpointPointers(long[] checkpointPointers) {
- this.checkpointPointers = checkpointPointers;
- }
-
- /**
- * Creates a new instance in the {@link State#INPROGRESS} state.
- */
- public static TransactionEdit createStarted(long writePointer, long visibilityUpperBound,
- long expirationDate, TransactionType type) {
- return new TransactionEdit(writePointer, visibilityUpperBound, State.INPROGRESS,
- expirationDate, null, 0L, false, type, null, 0L, 0L, null);
- }
-
- /**
- * Creates a new instance in the {@link State#COMMITTING} state.
- */
- public static TransactionEdit createCommitting(long writePointer, Set<ChangeId> changes) {
- return new TransactionEdit(writePointer, 0L, State.COMMITTING, 0L, changes, 0L, false, null, null, 0L, 0L, null);
- }
-
- /**
- * Creates a new instance in the {@link State#COMMITTED} state.
- */
- public static TransactionEdit createCommitted(long writePointer, Set<ChangeId> changes, long nextWritePointer,
- boolean canCommit) {
- return new TransactionEdit(writePointer, 0L, State.COMMITTED, 0L, changes, nextWritePointer, canCommit, null,
- null, 0L, 0L, null);
- }
-
- /**
- * Creates a new instance in the {@link State#ABORTED} state.
- */
- public static TransactionEdit createAborted(long writePointer, TransactionType type, long[] checkpointPointers) {
- return new TransactionEdit(writePointer, 0L, State.ABORTED, 0L, null, 0L, false, type, null, 0L, 0L,
- checkpointPointers);
- }
-
- /**
- * Creates a new instance in the {@link State#INVALID} state.
- */
- public static TransactionEdit createInvalid(long writePointer) {
- return new TransactionEdit(writePointer, 0L, State.INVALID, 0L, null, 0L, false, null, null, 0L, 0L, null);
- }
-
- /**
- * Creates a new instance in the {@link State#MOVE_WATERMARK} state.
- */
- public static TransactionEdit createMoveWatermark(long writePointer) {
- return new TransactionEdit(writePointer, 0L, State.MOVE_WATERMARK, 0L, null, 0L, false, null, null, 0L, 0L, null);
- }
-
- /**
- * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
- */
- public static TransactionEdit createTruncateInvalidTx(Set<Long> truncateInvalidTx) {
- return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, truncateInvalidTx,
- 0L, 0L, null);
- }
-
- /**
- * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
- */
- public static TransactionEdit createTruncateInvalidTxBefore(long truncateInvalidTxTime) {
- return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, null,
- truncateInvalidTxTime, 0L, null);
- }
-
- /**
- * Creates a new instance in the {@link State#CHECKPOINT} state.
- */
- public static TransactionEdit createCheckpoint(long writePointer, long parentWritePointer) {
- return new TransactionEdit(writePointer, 0L, State.CHECKPOINT, 0L, null, 0L, false, null, null, 0L,
- parentWritePointer, null);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- TransactionEditCodecs.encode(this, out);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- TransactionEditCodecs.decode(this, in);
- }
-
- @Override
- public final boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof TransactionEdit)) {
- return false;
- }
-
- TransactionEdit that = (TransactionEdit) o;
-
- return Objects.equal(this.writePointer, that.writePointer) &&
- Objects.equal(this.visibilityUpperBound, that.visibilityUpperBound) &&
- Objects.equal(this.commitPointer, that.commitPointer) &&
- Objects.equal(this.expirationDate, that.expirationDate) &&
- Objects.equal(this.state, that.state) &&
- Objects.equal(this.changes, that.changes) &&
- Objects.equal(this.canCommit, that.canCommit) &&
- Objects.equal(this.type, that.type) &&
- Objects.equal(this.truncateInvalidTx, that.truncateInvalidTx) &&
- Objects.equal(this.truncateInvalidTxTime, that.truncateInvalidTxTime) &&
- Objects.equal(this.parentWritePointer, that.parentWritePointer) &&
- Arrays.equals(this.checkpointPointers, that.checkpointPointers);
- }
-
- @Override
- public final int hashCode() {
- return Objects.hashCode(writePointer, visibilityUpperBound, commitPointer, expirationDate, state, changes,
- canCommit, type, parentWritePointer, checkpointPointers);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("writePointer", writePointer)
- .add("visibilityUpperBound", visibilityUpperBound)
- .add("commitPointer", commitPointer)
- .add("expiration", expirationDate)
- .add("state", state)
- .add("changesSize", changes != null ? changes.size() : 0)
- .add("canCommit", canCommit)
- .add("type", type)
- .add("truncateInvalidTx", truncateInvalidTx)
- .add("truncateInvalidTxTime", truncateInvalidTxTime)
- .add("parentWritePointer", parentWritePointer)
- .add("checkpointPointers", checkpointPointers)
- .toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
deleted file mode 100644
index e18b73f..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionType;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-/**
- * Utilities to handle encoding and decoding of {@link TransactionEdit} entries, while maintaining compatibility
- * with older versions of the serialized data.
- */
-public class TransactionEditCodecs {
-
- private static final TransactionEditCodec[] ALL_CODECS = {
- new TransactionEditCodecV1(),
- new TransactionEditCodecV2(),
- new TransactionEditCodecV3(),
- new TransactionEditCodecV4()
- };
-
- private static final SortedMap<Byte, TransactionEditCodec> CODECS = new TreeMap<>();
- static {
- for (TransactionEditCodec codec : ALL_CODECS) {
- CODECS.put(codec.getVersion(), codec);
- }
- }
-
- /**
- * Deserializes the encoded data from the given input stream, setting the values as fields
- * on the given {@code TransactionEdit} instances. This method expects first value in the
- * {code DataInput} to be a byte representing the codec version used to serialize the instance.
- *
- * @param dest the transaction edit to populate with the deserialized values
- * @param in the input stream containing the encoded data
- * @throws IOException if an error occurs while deserializing from the input stream
- */
- public static void decode(TransactionEdit dest, DataInput in) throws IOException {
- byte version = in.readByte();
- TransactionEditCodec codec = CODECS.get(version);
- if (codec == null) {
- throw new IOException("TransactionEdit was serialized with an unknown codec version " + version +
- ". Was it written with a newer version of Tephra?");
- }
- codec.decode(dest, in);
- }
-
- /**
- * Serializes the given {@code TransactionEdit} instance with the latest available codec.
- * This will first write out the version of the codec used to serialize the instance so that
- * the correct codec can be used when calling {@link #decode(TransactionEdit, DataInput)}.
- *
- * @param src the transaction edit to serialize
- * @param out the output stream to contain the data
- * @throws IOException if an error occurs while serializing to the output stream
- */
- public static void encode(TransactionEdit src, DataOutput out) throws IOException {
- TransactionEditCodec latestCodec = CODECS.get(CODECS.firstKey());
- out.writeByte(latestCodec.getVersion());
- latestCodec.encode(src, out);
- }
-
- /**
- * Encodes the given transaction edit using a specific codec. Note that this is only exposed
- * for use by tests.
- */
- @VisibleForTesting
- static void encode(TransactionEdit src, DataOutput out, TransactionEditCodec codec) throws IOException {
- out.writeByte(codec.getVersion());
- codec.encode(src, out);
- }
-
- /**
- * Defines the interface used for encoding and decoding {@link TransactionEdit} instances to and from
- * binary representations.
- */
- interface TransactionEditCodec {
- /**
- * Reads the encoded values from the data input stream and sets the fields in the given {@code TransactionEdit}
- * instance.
- *
- * @param dest the instance on which to set all the deserialized values
- * @param in the input stream containing the serialized data
- * @throws IOException if an error occurs while deserializing the data
- */
- void decode(TransactionEdit dest, DataInput in) throws IOException;
-
- /**
- * Writes all the field values from the {@code TransactionEdit} instance in serialized form to the data
- * output stream.
- *
- * @param src the instance to serialize to the stream
- * @param out the output stream to contain the data
- * @throws IOException if an error occurs while serializing the data
- */
- void encode(TransactionEdit src, DataOutput out) throws IOException;
-
- /**
- * Returns the version number for this codec. Each codec should use a unique version number, with the newest
- * codec having the lowest number.
- */
- byte getVersion();
- }
-
-
- // package-private for unit-test access
- static class TransactionEditCodecV1 implements TransactionEditCodec {
- @Override
- public void decode(TransactionEdit dest, DataInput in) throws IOException {
- dest.setWritePointer(in.readLong());
- int stateIdx = in.readInt();
- try {
- dest.setState(TransactionEdit.State.values()[stateIdx]);
- } catch (ArrayIndexOutOfBoundsException e) {
- throw new IOException("State enum ordinal value is out of range: " + stateIdx);
- }
- dest.setExpiration(in.readLong());
- dest.setCommitPointer(in.readLong());
- dest.setCanCommit(in.readBoolean());
- int changeSize = in.readInt();
- Set<ChangeId> changes = Sets.newHashSet();
- for (int i = 0; i < changeSize; i++) {
- int currentLength = in.readInt();
- byte[] currentBytes = new byte[currentLength];
- in.readFully(currentBytes);
- changes.add(new ChangeId(currentBytes));
- }
- dest.setChanges(changes);
- // 1st version did not store this info. It is safe to set firstInProgress to 0, it may decrease performance until
- // this tx is finished, but correctness will be preserved.
- dest.setVisibilityUpperBound(0);
- }
-
- /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
- * unit-tests only */
- @Override
- @Deprecated
- public void encode(TransactionEdit src, DataOutput out) throws IOException {
- out.writeLong(src.getWritePointer());
- // use ordinal for predictable size, though this does not support evolution
- out.writeInt(src.getState().ordinal());
- out.writeLong(src.getExpiration());
- out.writeLong(src.getCommitPointer());
- out.writeBoolean(src.getCanCommit());
- Set<ChangeId> changes = src.getChanges();
- if (changes == null) {
- out.writeInt(0);
- } else {
- out.writeInt(changes.size());
- for (ChangeId c : changes) {
- byte[] cKey = c.getKey();
- out.writeInt(cKey.length);
- out.write(cKey);
- }
- }
- // NOTE: we didn't have visibilityUpperBound in V1, it was added in V2
- // we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
- // it was added in V3
- }
-
- @Override
- public byte getVersion() {
- return -1;
- }
- }
-
- // package-private for unit-test access
- static class TransactionEditCodecV2 extends TransactionEditCodecV1 implements TransactionEditCodec {
- @Override
- public void decode(TransactionEdit dest, DataInput in) throws IOException {
- super.decode(dest, in);
- dest.setVisibilityUpperBound(in.readLong());
- }
-
- /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
- * unit-tests only */
- @Override
- public void encode(TransactionEdit src, DataOutput out) throws IOException {
- super.encode(src, out);
- out.writeLong(src.getVisibilityUpperBound());
- // NOTE: we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
- // it was added in V3
- }
-
- @Override
- public byte getVersion() {
- return -2;
- }
- }
-
- // TODO: refactor to avoid duplicate code among different version of codecs
- // package-private for unit-test access
- static class TransactionEditCodecV3 extends TransactionEditCodecV2 implements TransactionEditCodec {
- @Override
- public void decode(TransactionEdit dest, DataInput in) throws IOException {
- super.decode(dest, in);
- int typeIdx = in.readInt();
- // null transaction type is represented as -1
- if (typeIdx < 0) {
- dest.setType(null);
- } else {
- try {
- dest.setType(TransactionType.values()[typeIdx]);
- } catch (ArrayIndexOutOfBoundsException e) {
- throw new IOException("Type enum ordinal value is out of range: " + typeIdx);
- }
- }
-
- int truncateInvalidTxSize = in.readInt();
- Set<Long> truncateInvalidTx = emptySet(dest.getTruncateInvalidTx());
- for (int i = 0; i < truncateInvalidTxSize; i++) {
- truncateInvalidTx.add(in.readLong());
- }
- dest.setTruncateInvalidTx(truncateInvalidTx);
- dest.setTruncateInvalidTxTime(in.readLong());
- }
-
- private <T> Set<T> emptySet(Set<T> set) {
- if (set == null) {
- return Sets.newHashSet();
- }
- set.clear();
- return set;
- }
-
- @Override
- public void encode(TransactionEdit src, DataOutput out) throws IOException {
- super.encode(src, out);
- // null transaction type is represented as -1
- if (src.getType() == null) {
- out.writeInt(-1);
- } else {
- out.writeInt(src.getType().ordinal());
- }
-
- Set<Long> truncateInvalidTx = src.getTruncateInvalidTx();
- if (truncateInvalidTx == null) {
- out.writeInt(0);
- } else {
- out.writeInt(truncateInvalidTx.size());
- for (long id : truncateInvalidTx) {
- out.writeLong(id);
- }
- }
- out.writeLong(src.getTruncateInvalidTxTime());
- }
-
- @Override
- public byte getVersion() {
- return -3;
- }
- }
-
- static class TransactionEditCodecV4 extends TransactionEditCodecV3 {
- @Override
- public void decode(TransactionEdit dest, DataInput in) throws IOException {
- super.decode(dest, in);
- dest.setParentWritePointer(in.readLong());
- int checkpointPointersLen = in.readInt();
- if (checkpointPointersLen >= 0) {
- long[] checkpointPointers = new long[checkpointPointersLen];
- for (int i = 0; i < checkpointPointersLen; i++) {
- checkpointPointers[i] = in.readLong();
- }
- dest.setCheckpointPointers(checkpointPointers);
- }
- }
-
- @Override
- public void encode(TransactionEdit src, DataOutput out) throws IOException {
- super.encode(src, out);
- out.writeLong(src.getParentWritePointer());
- long[] checkpointPointers = src.getCheckpointPointers();
- if (checkpointPointers == null) {
- out.writeInt(-1);
- } else {
- out.writeInt(checkpointPointers.length);
- for (int i = 0; i < checkpointPointers.length; i++) {
- out.writeLong(checkpointPointers[i]);
- }
- }
- }
-
- @Override
- public byte getVersion() {
- return -4;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java
deleted file mode 100644
index f76a8f1..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Represents a log of transaction state changes.
- */
-public interface TransactionLog {
-
- String getName();
-
- long getTimestamp();
-
- void append(TransactionEdit edit) throws IOException;
-
- void append(List<TransactionEdit> edits) throws IOException;
-
- void close() throws IOException;
-
- TransactionLogReader getReader() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java
deleted file mode 100644
index 3a3eaca..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Represents a reader for {@link TransactionLog} instances.
- */
-public interface TransactionLogReader extends Closeable {
- /**
- * Returns the next {@code TransactionEdit} from the log file, based on the current position, or {@code null}
- * if the end of the file has been reached.
- */
- TransactionEdit next() throws IOException;
-
- /**
- * Populates {@code reuse} with the next {@code TransactionEdit}, based on the reader's current position in the
- * log file.
- * @param reuse The {@code TransactionEdit} instance to populate with the log entry data.
- * @return The {@code TransactionEdit} instance, or {@code null} if the end of the file has been reached.
- * @throws IOException If an error is encountered reading the log data.
- */
- TransactionEdit next(TransactionEdit reuse) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java
deleted file mode 100644
index 67d9aaf..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Common interface for transaction log writers used by classes extending {@link AbstractTransactionLog}.
- */
-public interface TransactionLogWriter extends Closeable {
- /**
- * Adds a new transaction entry to the log. Note that this does not guarantee that the entry has been flushed
- * to persistent storage until {@link #sync()} has been called.
- *
- * @param entry The transaction edit to append.
- * @throws IOException If an error occurs while writing the edit to storage.
- */
- void append(AbstractTransactionLog.Entry entry) throws IOException;
-
- /**
- * Makes an entry of number of transaction entries that will follow in that log in a single sync.
- *
- * @param count Number of transaction entries.
- * @throws IOException If an error occurs while writing the count to storage.
- */
- void commitMarker(int count) throws IOException;
-
- /**
- * Syncs any pending transaction edits added through {@link #append(AbstractTransactionLog.Entry)},
- * but not yet flushed to durable storage.
- *
- * @throws IOException If an error occurs while flushing the outstanding edits.
- */
- void sync() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java
deleted file mode 100644
index 22bc449..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionManager;
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-
-/**
- * Represents an in-memory snapshot of the full transaction state.
- */
-public class TransactionSnapshot implements TransactionVisibilityState {
- private long timestamp;
- private long readPointer;
- private long writePointer;
- private Collection<Long> invalid;
- private NavigableMap<Long, TransactionManager.InProgressTx> inProgress;
- private Map<Long, Set<ChangeId>> committingChangeSets;
- private Map<Long, Set<ChangeId>> committedChangeSets;
-
- public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
- NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
- Map<Long, Set<ChangeId>> committing, Map<Long, Set<ChangeId>> committed) {
- this(timestamp, readPointer, writePointer, invalid, inProgress);
- this.committingChangeSets = committing;
- this.committedChangeSets = committed;
- }
-
- public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
- NavigableMap<Long, TransactionManager.InProgressTx> inProgress) {
- this.timestamp = timestamp;
- this.readPointer = readPointer;
- this.writePointer = writePointer;
- this.invalid = invalid;
- this.inProgress = inProgress;
- this.committingChangeSets = Collections.emptyMap();
- this.committedChangeSets = Collections.emptyMap();
- }
-
- @Override
- public long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public long getReadPointer() {
- return readPointer;
- }
-
- @Override
- public long getWritePointer() {
- return writePointer;
- }
-
- @Override
- public Collection<Long> getInvalid() {
- return invalid;
- }
-
- @Override
- public NavigableMap<Long, TransactionManager.InProgressTx> getInProgress() {
- return inProgress;
- }
-
- @Override
- public long getVisibilityUpperBound() {
- // the readPointer of the oldest in-progress tx is the oldest in use
- // todo: potential problem with not moving visibility upper bound for the whole duration of long-running tx
- Map.Entry<Long, TransactionManager.InProgressTx> firstInProgress = inProgress.firstEntry();
- if (firstInProgress == null) {
- // using readPointer as smallest visible when non txs are there
- return readPointer;
- }
- return firstInProgress.getValue().getVisibilityUpperBound();
- }
-
- /**
- * Returns a map of transaction write pointer to sets of changed row keys for transactions that had called
- * {@code InMemoryTransactionManager.canCommit(Transaction, Collection)} but not yet called
- * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot.
- *
- * @return a map of transaction write pointer to set of changed row keys.
- */
- public Map<Long, Set<ChangeId>> getCommittingChangeSets() {
- return committingChangeSets;
- }
-
- /**
- * Returns a map of transaction write pointer to set of changed row keys for transaction that had successfully called
- * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot.
- *
- * @return a map of transaction write pointer to set of changed row keys.
- */
- public Map<Long, Set<ChangeId>> getCommittedChangeSets() {
- return committedChangeSets;
- }
-
- /**
- * Checks that this instance matches another {@code TransactionSnapshot} instance. Note that the equality check
- * ignores the snapshot timestamp value, but includes all other properties.
- *
- * @param obj the other instance to check for equality.
- * @return {@code true} if the instances are equal, {@code false} if not.
- */
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof TransactionSnapshot)) {
- return false;
- }
- TransactionSnapshot other = (TransactionSnapshot) obj;
- return readPointer == other.readPointer &&
- writePointer == other.writePointer &&
- invalid.equals(other.invalid) &&
- inProgress.equals(other.inProgress) &&
- committingChangeSets.equals(other.committingChangeSets) &&
- committedChangeSets.equals(other.committedChangeSets);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("timestamp", timestamp)
- .add("readPointer", readPointer)
- .add("writePointer", writePointer)
- .add("invalidSize", invalid.size())
- .add("inProgressSize", inProgress.size())
- .add("committingSize", committingChangeSets.size())
- .add("committedSize", committedChangeSets.size())
- .toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(readPointer, writePointer, invalid, inProgress, committingChangeSets, committedChangeSets);
- }
-
- /**
- * Creates a new {@code TransactionSnapshot} instance with copies of all of the individual collections.
- * @param readPointer current transaction read pointer
- * @param writePointer current transaction write pointer
- * @param invalid current list of invalid write pointers
- * @param inProgress current map of in-progress write pointers to expiration timestamps
- * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not
- * yet committed
- * @param committed current map of write pointers to change sets which have committed
- * @return a new {@code TransactionSnapshot} instance
- */
- public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer,
- long writePointer, Collection<Long> invalid,
- NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
- Map<Long, Set<ChangeId>> committing,
- NavigableMap<Long, Set<ChangeId>> committed) {
- // copy invalid IDs
- Collection<Long> invalidCopy = Lists.newArrayList(invalid);
- // copy in-progress IDs and expirations
- NavigableMap<Long, TransactionManager.InProgressTx> inProgressCopy = Maps.newTreeMap(inProgress);
-
- // for committing and committed maps, we need to copy each individual Set as well to prevent modification
- Map<Long, Set<ChangeId>> committingCopy = Maps.newHashMap();
- for (Map.Entry<Long, Set<ChangeId>> entry : committing.entrySet()) {
- committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
- }
-
- NavigableMap<Long, Set<ChangeId>> committedCopy = new TreeMap<>();
- for (Map.Entry<Long, Set<ChangeId>> entry : committed.entrySet()) {
- committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
- }
-
- return new TransactionSnapshot(snapshotTime, readPointer, writePointer,
- invalidCopy, inProgressCopy, committingCopy, committedCopy);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java
deleted file mode 100644
index 0acb9bb..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import com.google.common.util.concurrent.Service;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-
-/**
- * Defines the common contract for persisting transaction state changes.
- */
-public interface TransactionStateStorage extends Service {
-
- /**
- * Persists a snapshot of transaction state to an output stream.
- */
- public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException;
-
- /**
- * Persists a snapshot of transaction state.
- */
- public void writeSnapshot(TransactionSnapshot snapshot) throws IOException;
-
- /**
- * Returns the most recent snapshot that has been successfully written. Note that this may return {@code null}
- * if no completed snapshot files are found.
- */
- public TransactionSnapshot getLatestSnapshot() throws IOException;
-
- /**
- * Returns the most recent transaction visibility state that has been successfully written.
- * Note that this may return {@code null} if no completed snapshot files are found.
- * @return {@link TransactionVisibilityState}
- */
- public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException;
-
- /**
- * Removes any snapshots prior to the {@code numberToKeep} most recent.
- *
- * @param numberToKeep The number of most recent snapshots to keep.
- * @throws IOException If an error occurs while deleting old snapshots.
- * @return The timestamp of the oldest snapshot kept.
- */
- public long deleteOldSnapshots(int numberToKeep) throws IOException;
-
- /**
- * Returns the (non-qualified) names of available snapshots.
- */
- public List<String> listSnapshots() throws IOException;
-
- /**
- * Returns all {@link TransactionLog}s with a timestamp greater than or equal to the given timestamp. Note that
- * the returned list is guaranteed to be sorted in ascending timestamp order.
- */
- public List<TransactionLog> getLogsSince(long timestamp) throws IOException;
-
- /**
- * Creates a new {@link TransactionLog}.
- */
- public TransactionLog createLog(long timestamp) throws IOException;
-
- /**
- * Returns the (non-qualified) names of available logs.
- */
- public List<String> listLogs() throws IOException;
-
- /**
- * Removes any transaction logs with a timestamp older than the given value. Logs must be removed based on timestamp
- * to ensure we can fully recover state based on a given snapshot.
- * @param timestamp The timestamp to delete up to. Logs with a timestamp less than this value will be removed.
- * @throws IOException If an error occurs while removing logs.
- */
- public void deleteLogsOlderThan(long timestamp) throws IOException;
-
- /**
- * Create the directories required for the transaction state stage.
- * @throws IOException If an error occurred during the creation of required directories for transaction state storage.
- */
- public void setupStorage() throws IOException;
-
- /**
- * Returns a string representation of the location used for persistence.
- */
- public String getLocation();
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java
deleted file mode 100644
index 68d17c3..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TransactionManager;
-
-import java.util.Collection;
-import java.util.NavigableMap;
-
-/**
- * Transaction Visibility state contains information required by TransactionProcessor CoProcessor
- * to determine cell visibility.
- */
-public interface TransactionVisibilityState {
-
- /**
- * Returns the timestamp from when this snapshot was created.
- */
- long getTimestamp();
-
- /**
- * Returns the read pointer at the time of the snapshot.
- */
- long getReadPointer();
-
- /**
- * Returns the next write pointer at the time of the snapshot.
- */
- long getWritePointer();
-
- /**
- * Returns the list of invalid write pointers at the time of the snapshot.
- */
- Collection<Long> getInvalid();
-
- /**
- * Returns the map of write pointers to in-progress transactions at the time of the snapshot.
- */
- NavigableMap<Long, TransactionManager.InProgressTx> getInProgress();
-
- /**
- * @return transaction id {@code X} such that any of the transactions newer than {@code X} might be invisible to
- * some of the currently in-progress transactions or to those that will be started <p>
- * NOTE: the returned tx id can be invalid.
- */
- long getVisibilityUpperBound();
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java b/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java
deleted file mode 100644
index c94f0fe..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains interfaces and implementations for persisting transaction state.
- */
-package co.cask.tephra.persist;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java b/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java
deleted file mode 100644
index 664bfe3..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.rpc;
-
-/**
- * Defines lifecycle interface for all rpc handlers.
- */
-public interface RPCServiceHandler {
-
- void init() throws Exception;
-
- void destroy() throws Exception;
-}