You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2016/05/13 01:26:52 UTC

[1/2] incubator-tephra git commit: Copying old classes to provide backward compatibility with TransactionEdit

Repository: incubator-tephra
Updated Branches:
  refs/heads/master ab5249128 -> 3918b8a8c


Copying old classes to provide backward compatibility with TransactionEdit

Create a v3 TxLogReader and deprecate v2 which contains logs written with old cask classes

Fix BalanceBooks example run command

Adding Deprecated annotations - addressing PR comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/97602a0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/97602a0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/97602a0e

Branch: refs/heads/master
Commit: 97602a0e176881a21d7cb2c01cf116a4eb55c638
Parents: ab52491
Author: Gokul Gunasekaran <go...@cask.co>
Authored: Thu May 12 10:57:04 2016 -0700
Committer: Gokul Gunasekaran <go...@apache.org>
Committed: Thu May 12 18:10:37 2016 -0700

----------------------------------------------------------------------
 .../co/cask/tephra/persist/TransactionEdit.java | 364 +++++++++++++++++++
 .../tephra/persist/TransactionEditCodecs.java   | 315 ++++++++++++++++
 .../java/org/apache/tephra/TxConstants.java     |   2 +-
 .../tephra/persist/AbstractTransactionLog.java  |  41 +++
 .../HDFSTransactionLogReaderSupplier.java       |   3 +
 .../persist/HDFSTransactionLogReaderV1.java     |   5 +-
 .../persist/HDFSTransactionLogReaderV2.java     |   8 +-
 .../persist/HDFSTransactionLogReaderV3.java     | 114 ++++++
 .../apache/tephra/persist/TransactionEdit.java  |  15 +
 .../tephra/persist/HDFSTransactionLogTest.java  | 127 ++++++-
 .../apache/tephra/util/TransactionEditUtil.java |  38 +-
 .../apache/tephra/examples/BalanceBooks.java    |   2 +-
 12 files changed, 996 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
new file mode 100644
index 0000000..95b07d7
--- /dev/null
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package co.cask.tephra.persist;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.io.Writable;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.persist.TransactionLog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Represents a transaction state change in the {@link TransactionLog}.
+ * This class was included for backward compatibility reasons. It will be removed in future releases.
+ */
+@Deprecated
+public class TransactionEdit implements Writable {
+
+  /**
+   * The possible state changes for a transaction.
+   */
+  public enum State {
+    INPROGRESS, COMMITTING, COMMITTED, INVALID, ABORTED, MOVE_WATERMARK, TRUNCATE_INVALID_TX, CHECKPOINT
+  }
+
+  private long writePointer;
+
+  /**
+   * stores the value of visibility upper bound
+   * (see {@link TransactionManager.InProgressTx#getVisibilityUpperBound()})
+   * for edit of {@link State#INPROGRESS} only
+   */
+  private long visibilityUpperBound;
+  private long commitPointer;
+  private long expirationDate;
+  private State state;
+  private Set<ChangeId> changes;
+  /** Whether or not the COMMITTED change should be fully committed. */
+  private boolean canCommit;
+  private TransactionType type;
+  private Set<Long> truncateInvalidTx;
+  private long truncateInvalidTxTime;
+  private long parentWritePointer;
+  private long[] checkpointPointers;
+
+  // for Writable
+  public TransactionEdit() {
+    this.changes = Sets.newHashSet();
+    this.truncateInvalidTx = Sets.newHashSet();
+  }
+
+  // package private for testing
+  TransactionEdit(long writePointer, long visibilityUpperBound, State state, long expirationDate,
+                  Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type,
+                  Set<Long> truncateInvalidTx, long truncateInvalidTxTime, long parentWritePointer,
+                  long[] checkpointPointers) {
+    this.writePointer = writePointer;
+    this.visibilityUpperBound = visibilityUpperBound;
+    this.state = state;
+    this.expirationDate = expirationDate;
+    this.changes = changes != null ? changes : Collections.<ChangeId>emptySet();
+    this.commitPointer = commitPointer;
+    this.canCommit = canCommit;
+    this.type = type;
+    this.truncateInvalidTx = truncateInvalidTx != null ? truncateInvalidTx : Collections.<Long>emptySet();
+    this.truncateInvalidTxTime = truncateInvalidTxTime;
+    this.parentWritePointer = parentWritePointer;
+    this.checkpointPointers = checkpointPointers;
+  }
+
+  /**
+   * Returns the transaction write pointer assigned for the state change.
+   */
+  public long getWritePointer() {
+    return writePointer;
+  }
+
+  void setWritePointer(long writePointer) {
+    this.writePointer = writePointer;
+  }
+
+  public long getVisibilityUpperBound() {
+    return visibilityUpperBound;
+  }
+
+  void setVisibilityUpperBound(long visibilityUpperBound) {
+    this.visibilityUpperBound = visibilityUpperBound;
+  }
+
+  /**
+   * Returns the type of state change represented.
+   */
+  public State getState() {
+    return state;
+  }
+
+  void setState(State state) {
+    this.state = state;
+  }
+
+  /**
+   * Returns any expiration timestamp (in milliseconds) associated with the state change.  This should only
+   * be populated for changes of type {@link State#INPROGRESS}.
+   */
+  public long getExpiration() {
+    return expirationDate;
+  }
+
+  void setExpiration(long expirationDate) {
+    this.expirationDate = expirationDate;
+  }
+
+  /**
+   * @return the set of changed row keys associated with the state change.  This is only populated for edits
+   * of type {@link State#COMMITTING} or {@link State#COMMITTED}.
+   */
+  public Set<ChangeId> getChanges() {
+    return changes;
+  }
+
+  void setChanges(Set<ChangeId> changes) {
+    this.changes = changes;
+  }
+
+  /**
+   * Returns the write pointer used to commit the row key change set.  This is only populated for edits of type
+   * {@link State#COMMITTED}.
+   */
+  public long getCommitPointer() {
+    return commitPointer;
+  }
+
+  void setCommitPointer(long commitPointer) {
+    this.commitPointer = commitPointer;
+  }
+
+  /**
+   * Returns whether or not the transaction should be moved to the committed set.  This is only populated for edits
+   * of type {@link State#COMMITTED}.
+   */
+  public boolean getCanCommit() {
+    return canCommit;
+  }
+
+  void setCanCommit(boolean canCommit) {
+    this.canCommit = canCommit;
+  }
+
+  /**
+   * Returns the transaction type. This is only populated for edits of type {@link State#INPROGRESS} or
+   * {@link State#ABORTED}.
+   */
+  public TransactionType getType() {
+    return type;
+  }
+
+  void setType(TransactionType type) {
+    this.type = type;
+  }
+
+  /**
+   * Returns the transaction ids to be removed from invalid transaction list. This is only populated for
+   * edits of type {@link State#TRUNCATE_INVALID_TX}
+   */
+  public Set<Long> getTruncateInvalidTx() {
+    return truncateInvalidTx;
+  }
+
+  void setTruncateInvalidTx(Set<Long> truncateInvalidTx) {
+    this.truncateInvalidTx = truncateInvalidTx;
+  }
+
+  /**
+   * Returns the time until which the invalid transactions need to be truncated from invalid transaction list.
+   * This is only populated for  edits of type {@link State#TRUNCATE_INVALID_TX}
+   */
+  public long getTruncateInvalidTxTime() {
+    return truncateInvalidTxTime;
+  }
+
+  void setTruncateInvalidTxTime(long truncateInvalidTxTime) {
+    this.truncateInvalidTxTime = truncateInvalidTxTime;
+  }
+
+  /**
+   * Returns the parent write pointer for a checkpoint operation.  This is only populated for edits of type
+   * {@link State#CHECKPOINT}
+   */
+  public long getParentWritePointer() {
+    return parentWritePointer;
+  }
+
+  void setParentWritePointer(long parentWritePointer) {
+    this.parentWritePointer = parentWritePointer;
+  }
+
+  /**
+   * Returns the checkpoint write pointers for the edit.  This is only populated for edits of type
+   * {@link State#ABORTED}.
+   */
+  public long[] getCheckpointPointers() {
+    return checkpointPointers;
+  }
+
+  void setCheckpointPointers(long[] checkpointPointers) {
+    this.checkpointPointers = checkpointPointers;
+  }
+
+  /**
+   * Creates a new instance in the {@link State#INPROGRESS} state.
+   */
+  public static TransactionEdit createStarted(long writePointer, long visibilityUpperBound,
+                                              long expirationDate, TransactionType type) {
+    return new TransactionEdit(writePointer, visibilityUpperBound, State.INPROGRESS,
+                               expirationDate, null, 0L, false, type, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#COMMITTING} state.
+   */
+  public static TransactionEdit createCommitting(long writePointer, Set<ChangeId> changes) {
+    return new TransactionEdit(writePointer, 0L, State.COMMITTING, 0L, changes, 0L, false, null, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#COMMITTED} state.
+   */
+  public static TransactionEdit createCommitted(long writePointer, Set<ChangeId> changes, long nextWritePointer,
+                                                boolean canCommit) {
+    return new TransactionEdit(writePointer, 0L, State.COMMITTED, 0L, changes, nextWritePointer, canCommit, null,
+                               null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#ABORTED} state.
+   */
+  public static TransactionEdit createAborted(long writePointer, TransactionType type, long[] checkpointPointers) {
+    return new TransactionEdit(writePointer, 0L, State.ABORTED, 0L, null, 0L, false, type, null, 0L, 0L,
+                               checkpointPointers);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#INVALID} state.
+   */
+  public static TransactionEdit createInvalid(long writePointer) {
+    return new TransactionEdit(writePointer, 0L, State.INVALID, 0L, null, 0L, false, null, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#MOVE_WATERMARK} state.
+   */
+  public static TransactionEdit createMoveWatermark(long writePointer) {
+    return new TransactionEdit(writePointer, 0L, State.MOVE_WATERMARK, 0L, null, 0L, false, null, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
+   */
+  public static TransactionEdit createTruncateInvalidTx(Set<Long> truncateInvalidTx) {
+    return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, truncateInvalidTx,
+                               0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
+   */
+  public static TransactionEdit createTruncateInvalidTxBefore(long truncateInvalidTxTime) {
+    return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, null,
+                               truncateInvalidTxTime, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#CHECKPOINT} state.
+   */
+  public static TransactionEdit createCheckpoint(long writePointer, long parentWritePointer) {
+    return new TransactionEdit(writePointer, 0L, State.CHECKPOINT, 0L, null, 0L, false, null, null, 0L,
+                               parentWritePointer, null);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    TransactionEditCodecs.encode(this, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    TransactionEditCodecs.decode(this, in);
+  }
+
+  @Override
+  public final boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof TransactionEdit)) {
+      return false;
+    }
+
+    TransactionEdit that = (TransactionEdit) o;
+
+    return Objects.equal(this.writePointer, that.writePointer) &&
+      Objects.equal(this.visibilityUpperBound, that.visibilityUpperBound) &&
+      Objects.equal(this.commitPointer, that.commitPointer) &&
+      Objects.equal(this.expirationDate, that.expirationDate) &&
+      Objects.equal(this.state, that.state) &&
+      Objects.equal(this.changes, that.changes) &&
+      Objects.equal(this.canCommit, that.canCommit) &&
+      Objects.equal(this.type, that.type) &&
+      Objects.equal(this.truncateInvalidTx, that.truncateInvalidTx) &&
+      Objects.equal(this.truncateInvalidTxTime, that.truncateInvalidTxTime) &&
+      Objects.equal(this.parentWritePointer, that.parentWritePointer) &&
+      Arrays.equals(this.checkpointPointers, that.checkpointPointers);
+  }
+
+  @Override
+  public final int hashCode() {
+    return Objects.hashCode(writePointer, visibilityUpperBound, commitPointer, expirationDate, state, changes,
+                            canCommit, type, parentWritePointer, checkpointPointers);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("writePointer", writePointer)
+      .add("visibilityUpperBound", visibilityUpperBound)
+      .add("commitPointer", commitPointer)
+      .add("expiration", expirationDate)
+      .add("state", state)
+      .add("changesSize", changes != null ? changes.size() : 0)
+      .add("canCommit", canCommit)
+      .add("type", type)
+      .add("truncateInvalidTx", truncateInvalidTx)
+      .add("truncateInvalidTxTime", truncateInvalidTxTime)
+      .add("parentWritePointer", parentWritePointer)
+      .add("checkpointPointers", checkpointPointers)
+      .toString();
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
new file mode 100644
index 0000000..8bea70f
--- /dev/null
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package co.cask.tephra.persist;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Utilities to handle encoding and decoding of {@link TransactionEdit} entries, while maintaining compatibility
+ * with older versions of the serialized data. This class was included for backward compatibility reasons.
+ * It will be removed in future releases.
+ */
+@Deprecated
+public class TransactionEditCodecs {
+
+  private static final TransactionEditCodec[] ALL_CODECS = {
+      new TransactionEditCodecV1(),
+      new TransactionEditCodecV2(),
+      new TransactionEditCodecV3(),
+      new TransactionEditCodecV4()
+  };
+
+  private static final SortedMap<Byte, TransactionEditCodec> CODECS = new TreeMap<>();
+  static {
+    for (TransactionEditCodec codec : ALL_CODECS) {
+      CODECS.put(codec.getVersion(), codec);
+    }
+  }
+
+  /**
+   * Deserializes the encoded data from the given input stream, setting the values as fields
+   * on the given {@code TransactionEdit} instances.  This method expects first value in the
+   * {code DataInput} to be a byte representing the codec version used to serialize the instance.
+   *
+   * @param dest the transaction edit to populate with the deserialized values
+   * @param in the input stream containing the encoded data
+   * @throws IOException if an error occurs while deserializing from the input stream
+   */
+  public static void decode(TransactionEdit dest, DataInput in) throws IOException {
+    byte version = in.readByte();
+    TransactionEditCodec codec = CODECS.get(version);
+    if (codec == null) {
+      throw new IOException("TransactionEdit was serialized with an unknown codec version " + version +
+          ". Was it written with a newer version of Tephra?");
+    }
+    codec.decode(dest, in);
+  }
+
+  /**
+   * Serializes the given {@code TransactionEdit} instance with the latest available codec.
+   * This will first write out the version of the codec used to serialize the instance so that
+   * the correct codec can be used when calling {@link #decode(TransactionEdit, DataInput)}.
+   *
+   * @param src the transaction edit to serialize
+   * @param out the output stream to contain the data
+   * @throws IOException if an error occurs while serializing to the output stream
+   */
+  public static void encode(TransactionEdit src, DataOutput out) throws IOException {
+    TransactionEditCodec latestCodec = CODECS.get(CODECS.firstKey());
+    out.writeByte(latestCodec.getVersion());
+    latestCodec.encode(src, out);
+  }
+
+  /**
+   * Encodes the given transaction edit using a specific codec.  Note that this is only exposed
+   * for use by tests.
+   */
+  @VisibleForTesting
+  static void encode(TransactionEdit src, DataOutput out, TransactionEditCodec codec) throws IOException {
+    out.writeByte(codec.getVersion());
+    codec.encode(src, out);
+  }
+
+  /**
+   * Defines the interface used for encoding and decoding {@link TransactionEdit} instances to and from
+   * binary representations.
+   */
+  interface TransactionEditCodec {
+    /**
+     * Reads the encoded values from the data input stream and sets the fields in the given {@code TransactionEdit}
+     * instance.
+     *
+     * @param dest the instance on which to set all the deserialized values
+     * @param in the input stream containing the serialized data
+     * @throws IOException if an error occurs while deserializing the data
+     */
+    void decode(TransactionEdit dest, DataInput in) throws IOException;
+
+    /**
+     * Writes all the field values from the {@code TransactionEdit} instance in serialized form to the data
+     * output stream.
+     *
+     * @param src the instance to serialize to the stream
+     * @param out the output stream to contain the data
+     * @throws IOException if an error occurs while serializing the data
+     */
+    void encode(TransactionEdit src, DataOutput out) throws IOException;
+
+    /**
+     * Returns the version number for this codec.  Each codec should use a unique version number, with the newest
+     * codec having the lowest number.
+     */
+    byte getVersion();
+  }
+
+
+  // package-private for unit-test access
+  static class TransactionEditCodecV1 implements TransactionEditCodec {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      dest.setWritePointer(in.readLong());
+      int stateIdx = in.readInt();
+      try {
+        dest.setState(TransactionEdit.State.values()[stateIdx]);
+      } catch (ArrayIndexOutOfBoundsException e) {
+        throw new IOException("State enum ordinal value is out of range: " + stateIdx);
+      }
+      dest.setExpiration(in.readLong());
+      dest.setCommitPointer(in.readLong());
+      dest.setCanCommit(in.readBoolean());
+      int changeSize = in.readInt();
+      Set<ChangeId> changes = Sets.newHashSet();
+      for (int i = 0; i < changeSize; i++) {
+        int currentLength = in.readInt();
+        byte[] currentBytes = new byte[currentLength];
+        in.readFully(currentBytes);
+        changes.add(new ChangeId(currentBytes));
+      }
+      dest.setChanges(changes);
+      // 1st version did not store this info. It is safe to set firstInProgress to 0, it may decrease performance until
+      // this tx is finished, but correctness will be preserved.
+      dest.setVisibilityUpperBound(0);
+    }
+
+    /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
+     *  unit-tests only */
+    @Override
+    @Deprecated
+    public void encode(TransactionEdit src, DataOutput out) throws IOException {
+      out.writeLong(src.getWritePointer());
+      // use ordinal for predictable size, though this does not support evolution
+      out.writeInt(src.getState().ordinal());
+      out.writeLong(src.getExpiration());
+      out.writeLong(src.getCommitPointer());
+      out.writeBoolean(src.getCanCommit());
+      Set<ChangeId> changes = src.getChanges();
+      if (changes == null) {
+        out.writeInt(0);
+      } else {
+        out.writeInt(changes.size());
+        for (ChangeId c : changes) {
+          byte[] cKey = c.getKey();
+          out.writeInt(cKey.length);
+          out.write(cKey);
+        }
+      }
+      // NOTE: we didn't have visibilityUpperBound in V1, it was added in V2
+      // we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
+      // it was added in V3
+    }
+
+    @Override
+    public byte getVersion() {
+      return -1;
+    }
+  }
+
+  // package-private for unit-test access
+  static class TransactionEditCodecV2 extends TransactionEditCodecV1 implements TransactionEditCodec {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      super.decode(dest, in);
+      dest.setVisibilityUpperBound(in.readLong());
+    }
+
+    /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
+     *  unit-tests only */
+    @Override
+    public void encode(TransactionEdit src, DataOutput out) throws IOException {
+      super.encode(src, out);
+      out.writeLong(src.getVisibilityUpperBound());
+      // NOTE: we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
+      // it was added in V3
+    }
+
+    @Override
+    public byte getVersion() {
+      return -2;
+    }
+  }
+
+  // TODO: refactor to avoid duplicate code among different version of codecs
+  // package-private for unit-test access
+  static class TransactionEditCodecV3 extends TransactionEditCodecV2 implements TransactionEditCodec {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      super.decode(dest, in);
+      int typeIdx = in.readInt();
+      // null transaction type is represented as -1
+      if (typeIdx < 0) {
+        dest.setType(null);
+      } else {
+        try {
+          dest.setType(TransactionType.values()[typeIdx]);
+        } catch (ArrayIndexOutOfBoundsException e) {
+          throw new IOException("Type enum ordinal value is out of range: " + typeIdx);
+        }
+      }
+
+      int truncateInvalidTxSize = in.readInt();
+      Set<Long> truncateInvalidTx = emptySet(dest.getTruncateInvalidTx());
+      for (int i = 0; i < truncateInvalidTxSize; i++) {
+        truncateInvalidTx.add(in.readLong());
+      }
+      dest.setTruncateInvalidTx(truncateInvalidTx);
+      dest.setTruncateInvalidTxTime(in.readLong());
+    }
+
+    private <T> Set<T> emptySet(Set<T> set) {
+      if (set == null) {
+        return Sets.newHashSet();
+      }
+      set.clear();
+      return set;
+    }
+
+    @Override
+    public void encode(TransactionEdit src, DataOutput out) throws IOException {
+      super.encode(src, out);
+      // null transaction type is represented as -1
+      if (src.getType() == null) {
+        out.writeInt(-1);
+      } else {
+        out.writeInt(src.getType().ordinal());
+      }
+
+      Set<Long> truncateInvalidTx = src.getTruncateInvalidTx();
+      if (truncateInvalidTx == null) {
+        out.writeInt(0);
+      } else {
+        out.writeInt(truncateInvalidTx.size());
+        for (long id : truncateInvalidTx) {
+          out.writeLong(id);
+        }
+      }
+      out.writeLong(src.getTruncateInvalidTxTime());
+    }
+
+    @Override
+    public byte getVersion() {
+      return -3;
+    }
+  }
+
+  static class TransactionEditCodecV4 extends TransactionEditCodecV3 {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      super.decode(dest, in);
+      dest.setParentWritePointer(in.readLong());
+      int checkpointPointersLen = in.readInt();
+      if (checkpointPointersLen >= 0) {
+        long[] checkpointPointers = new long[checkpointPointersLen];
+        for (int i = 0; i < checkpointPointersLen; i++) {
+          checkpointPointers[i] = in.readLong();
+        }
+        dest.setCheckpointPointers(checkpointPointers);
+      }
+    }
+
+    @Override
+    public void encode(TransactionEdit src, DataOutput out) throws IOException {
+      super.encode(src, out);
+      out.writeLong(src.getParentWritePointer());
+      long[] checkpointPointers = src.getCheckpointPointers();
+      if (checkpointPointers == null) {
+        out.writeInt(-1);
+      } else {
+        out.writeInt(checkpointPointers.length);
+        for (int i = 0; i < checkpointPointers.length; i++) {
+          out.writeLong(checkpointPointers[i]);
+        }
+      }
+    }
+
+    @Override
+    public byte getVersion() {
+      return -4;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 61ee3cc..1c4fafc 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -345,7 +345,7 @@ public class TxConstants {
      */
     public static final String NUM_ENTRIES_APPENDED = "count";
     public static final String VERSION_KEY = "version";
-    public static final byte CURRENT_VERSION = 2;
+    public static final byte CURRENT_VERSION = 3;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
index b1e0978..cf97c92 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
@@ -18,6 +18,7 @@
 
 package org.apache.tephra.persist;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -248,4 +249,44 @@ public abstract class AbstractTransactionLog implements TransactionLog {
       this.edit.readFields(in);
     }
   }
+
+  // package private for testing
+  @Deprecated
+  @VisibleForTesting
+  static class CaskEntry implements Writable {
+    private LongWritable key;
+    private co.cask.tephra.persist.TransactionEdit edit;
+
+
+    // for Writable
+    public CaskEntry() {
+      this.key = new LongWritable();
+      this.edit = new co.cask.tephra.persist.TransactionEdit();
+    }
+
+    public CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) {
+      this.key = key;
+      this.edit = edit;
+    }
+
+    public LongWritable getKey() {
+      return this.key;
+    }
+
+    public co.cask.tephra.persist.TransactionEdit getEdit() {
+      return this.edit;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      this.key.write(out);
+      this.edit.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.key.readFields(in);
+      this.edit.readFields(in);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
index a517903..1bddc31 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
@@ -45,6 +45,9 @@ public class HDFSTransactionLogReaderSupplier implements Supplier<TransactionLog
     }
 
     switch (version) {
+      case 3:
+        logReader = new HDFSTransactionLogReaderV3(reader);
+        return logReader;
       case 2:
         logReader = new HDFSTransactionLogReaderV2(reader);
         return logReader;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
index faefaec..38b74d8 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
@@ -53,8 +53,9 @@ public class HDFSTransactionLogReaderV1 implements TransactionLogReader {
     }
 
     try {
-      boolean successful = reader.next(key, reuse);
-      return successful ? reuse : null;
+      co.cask.tephra.persist.TransactionEdit oldTxEdit = new co.cask.tephra.persist.TransactionEdit();
+      boolean successful = reader.next(key, oldTxEdit);
+      return successful ? TransactionEdit.convertCaskTxEdit(oldTxEdit) : null;
     } catch (EOFException e) {
       LOG.warn("Hit an unexpected EOF while trying to read the Transaction Edit. Skipping the entry.", e);
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
index ce50da8..e371b98 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
@@ -38,7 +38,7 @@ public class HDFSTransactionLogReaderV2 implements TransactionLogReader {
   private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class);
 
   private final SequenceFile.Reader reader;
-  private final Queue<TransactionEdit> transactionEdits;
+  private final Queue<co.cask.tephra.persist.TransactionEdit> transactionEdits;
   private final CommitMarkerCodec commitMarkerCodec;
   private final LongWritable key;
 
@@ -76,12 +76,12 @@ public class HDFSTransactionLogReaderV2 implements TransactionLogReader {
     }
 
     if (!transactionEdits.isEmpty()) {
-      return transactionEdits.remove();
+      return TransactionEdit.convertCaskTxEdit(transactionEdits.remove());
     }
 
     // Fetch the 'marker' and read 'marker' number of edits
     populateTransactionEdits();
-    return transactionEdits.poll();
+    return TransactionEdit.convertCaskTxEdit(transactionEdits.poll());
   }
 
   private void populateTransactionEdits() throws IOException {
@@ -96,7 +96,7 @@ public class HDFSTransactionLogReaderV2 implements TransactionLogReader {
     }
 
     for (int i = 0; i < numEntries; i++) {
-      TransactionEdit edit = new TransactionEdit();
+      co.cask.tephra.persist.TransactionEdit edit = new co.cask.tephra.persist.TransactionEdit();
       try {
         if (reader.next(key, edit)) {
           transactionEdits.add(edit);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java
new file mode 100644
index 0000000..3670e3f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/**
+ * {@link TransactionLogReader} that can read v3 version of Transaction logs. The logs are expected to
+ * have commit markers that indicates the size of the batch of {@link TransactionEdit}s (follows the marker),
+ * that were synced together. If the expected number of {@link TransactionEdit}s are not present then that set of
+ * {@link TransactionEdit}s are discarded.
+ */
+public class HDFSTransactionLogReaderV3 implements TransactionLogReader {
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class);
+
+  private final SequenceFile.Reader reader;
+  private final Queue<TransactionEdit> transactionEdits;
+  private final CommitMarkerCodec commitMarkerCodec;
+  private final LongWritable key;
+
+  private boolean closed;
+
+  public HDFSTransactionLogReaderV3(SequenceFile.Reader reader) {
+    this.reader = reader;
+    this.transactionEdits = new ArrayDeque<>();
+    this.key = new LongWritable();
+    this.commitMarkerCodec = new CommitMarkerCodec();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    try {
+      commitMarkerCodec.close();
+    } finally {
+      reader.close();
+      closed = true;
+    }
+  }
+
+  @Override
+  public TransactionEdit next() throws IOException {
+    return next(null);
+  }
+
+  @Override
+  public TransactionEdit next(TransactionEdit reuse) throws IOException {
+    if (closed) {
+      return null;
+    }
+
+    if (!transactionEdits.isEmpty()) {
+      return transactionEdits.remove();
+    }
+
+    // Fetch the 'marker' and read 'marker' number of edits
+    populateTransactionEdits();
+    return transactionEdits.poll();
+  }
+
+  private void populateTransactionEdits() throws IOException {
+    // read the marker to determine numEntries to read.
+    int numEntries = 0;
+    try {
+      // can throw EOFException if reading of incomplete commit marker, no other action required since we can safely
+      // ignore this
+      numEntries = commitMarkerCodec.readMarker(reader);
+    } catch (EOFException e) {
+      LOG.warn("Reached EOF in log while trying to read commit marker", e);
+    }
+
+    for (int i = 0; i < numEntries; i++) {
+      TransactionEdit edit = new TransactionEdit();
+      try {
+        if (reader.next(key, edit)) {
+          transactionEdits.add(edit);
+        } else {
+          throw new EOFException("Attempt to read TransactionEdit failed.");
+        }
+      } catch (EOFException e) {
+        // we have reached EOF before reading back numEntries, we clear the partial list and return.
+        LOG.warn("Reached EOF in log before reading {} entries. Ignoring all {} edits since the last marker",
+                 numEntries, transactionEdits.size(), e);
+        transactionEdits.clear();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
index 1d07e72..8702b5b 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
@@ -298,6 +298,21 @@ public class TransactionEdit implements Writable {
         parentWritePointer, null);
   }
 
+  /**
+   * Creates a new instance from {@link co.cask.tephra.persist.TransactionEdit}.
+   */
+  @Deprecated
+  public static TransactionEdit convertCaskTxEdit(co.cask.tephra.persist.TransactionEdit txEdit) {
+    if (txEdit == null) {
+      return null;
+    }
+    return new TransactionEdit(txEdit.getWritePointer(), txEdit.getVisibilityUpperBound(),
+                               TransactionEdit.State.valueOf(txEdit.getState().toString()), txEdit.getExpiration(),
+                               txEdit.getChanges(), txEdit.getCommitPointer(), txEdit.getCanCommit(), txEdit.getType(),
+                               txEdit.getTruncateInvalidTx(), txEdit.getTruncateInvalidTxTime(),
+                               txEdit.getParentWritePointer(), txEdit.getCheckpointPointers());
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     TransactionEditCodecs.encode(this, out);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
index 7b9f06b..7a34e55 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
@@ -86,16 +86,26 @@ public class HDFSTransactionLogTest {
   }
 
   private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs,
-                                                    long timeInMillis, boolean withMarker) throws IOException {
+                                                    long timeInMillis, byte versionNumber) throws IOException {
     String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
     Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
     SequenceFile.Metadata metadata = new SequenceFile.Metadata();
-    if (withMarker) {
+    if (versionNumber > 1) {
       metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
-                   new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
+                   new Text(Byte.toString(versionNumber)));
+    }
+
+    switch (versionNumber) {
+      case 1:
+      case 2:
+        return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
+                                         co.cask.tephra.persist.TransactionEdit.class,
+                                         SequenceFile.CompressionType.NONE, null, null, metadata);
+      default:
+        return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
+                                         TransactionEdit.class, SequenceFile.CompressionType.NONE,
+                                         null, null, metadata);
     }
-    return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
-                                     TransactionEdit.class, SequenceFile.CompressionType.NONE, null, null, metadata);
   }
 
   private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception {
@@ -103,33 +113,101 @@ public class HDFSTransactionLogTest {
     CommitMarkerCodec.writeMarker(writer, size);
   }
 
-  private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete)
+  private void testCaskTransactionLogSync(int totalCount, int batchSize, byte versionNumber, boolean isComplete)
     throws Exception {
-    List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
+    List<co.cask.tephra.persist.TransactionEdit> edits = TransactionEditUtil.createRandomCaskEdits(totalCount);
     long timestamp = System.currentTimeMillis();
     Configuration configuration = getConfiguration();
     FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
-    SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, withMarker);
+    SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber);
     AtomicLong logSequence = new AtomicLong();
     HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
-    AbstractTransactionLog.Entry entry;
+    AbstractTransactionLog.CaskEntry entry;
 
     for (int i = 0; i < totalCount - batchSize; i += batchSize) {
-      if (withMarker) {
+      if (versionNumber > 1) {
         writeNumWrites(writer, batchSize);
       }
       for (int j = 0; j < batchSize; j++) {
-        entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
+        entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
         writer.append(entry.getKey(), entry.getEdit());
       }
       writer.syncFs();
     }
 
-    if (withMarker) {
+    if (versionNumber > 1) {
       writeNumWrites(writer, batchSize);
     }
 
     for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
+      entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
+      writer.append(entry.getKey(), entry.getEdit());
+    }
+
+    entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()),
+                                                 edits.get(totalCount - 1));
+    if (isComplete) {
+      writer.append(entry.getKey(), entry.getEdit());
+    } else {
+      byte[] bytes = Longs.toByteArray(entry.getKey().get());
+      writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() {
+        @Override
+        public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
+          byte[] test = new byte[]{0x2};
+          outStream.write(test, 0, 1);
+        }
+
+        @Override
+        public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
+          // no-op
+        }
+
+        @Override
+        public int getSize() {
+          // mimic size longer than the actual byte array size written, so we would reach EOF
+          return 12;
+        }
+      });
+    }
+    writer.syncFs();
+    Closeables.closeQuietly(writer);
+
+    // now let's try to read this log
+    TransactionLogReader reader = transactionLog.getReader();
+    int syncedEdits = 0;
+    while (reader.next() != null) {
+      // testing reading the transaction edits
+      syncedEdits++;
+    }
+    if (isComplete) {
+      Assert.assertEquals(totalCount, syncedEdits);
+    } else {
+      Assert.assertEquals(totalCount - batchSize, syncedEdits);
+    }
+  }
+
+  private void testTransactionLogSync(int totalCount, int batchSize, byte versionNumber, boolean isComplete)
+    throws Exception {
+    List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
+    long timestamp = System.currentTimeMillis();
+    Configuration configuration = getConfiguration();
+    FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
+    SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber);
+    AtomicLong logSequence = new AtomicLong();
+    HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
+    AbstractTransactionLog.Entry entry;
+
+    for (int i = 0; i < totalCount - batchSize; i += batchSize) {
+      writeNumWrites(writer, batchSize);
+      for (int j = 0; j < batchSize; j++) {
+        entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
+        writer.append(entry.getKey(), entry.getEdit());
+      }
+      writer.syncFs();
+    }
+
+    writeNumWrites(writer, batchSize);
+    for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
       entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
       writer.append(entry.getKey(), entry.getEdit());
     }
@@ -177,22 +255,33 @@ public class HDFSTransactionLogTest {
   }
 
   @Test
-  public void testTransactionLogNewVersion() throws Exception {
+  public void testTransactionLogVersion3() throws Exception {
+    // in-complete sync
+    testTransactionLogSync(1000, 1, (byte) 3, false);
+    testTransactionLogSync(2000, 5, (byte) 3, false);
+
+    // complete sync
+    testTransactionLogSync(1000, 1, (byte) 3, true);
+    testTransactionLogSync(2000, 5, (byte) 3, true);
+  }
+
+  @Test
+  public void testTransactionLogVersion2() throws Exception {
     // in-complete sync
-    testTransactionLogSync(1000, 1, true, false);
-    testTransactionLogSync(2000, 5, true, false);
+    testCaskTransactionLogSync(1000, 1, (byte) 2, false);
+    testCaskTransactionLogSync(2000, 5, (byte) 2, false);
 
     // complete sync
-    testTransactionLogSync(1000, 1, true, true);
-    testTransactionLogSync(2000, 5, true, true);
+    testCaskTransactionLogSync(1000, 1, (byte) 2, true);
+    testCaskTransactionLogSync(2000, 5, (byte) 2, true);
   }
 
   @Test
   public void testTransactionLogOldVersion() throws Exception {
     // in-complete sync
-    testTransactionLogSync(1000, 1, false, false);
+    testCaskTransactionLogSync(1000, 1, (byte) 1, false);
 
     // complete sync
-    testTransactionLogSync(2000, 5, false, true);
+    testCaskTransactionLogSync(2000, 5, (byte) 1, true);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
index 854ccdd..52e82d9 100644
--- a/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
+++ b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
@@ -24,6 +24,7 @@ import org.apache.tephra.ChangeId;
 import org.apache.tephra.TransactionType;
 import org.apache.tephra.persist.TransactionEdit;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -36,38 +37,53 @@ public final class TransactionEditUtil {
 
   /**
    * Generates a number of semi-random {@link TransactionEdit} instances.
+   */
+  public static List<TransactionEdit> createRandomEdits(int numEntries) {
+    List<co.cask.tephra.persist.TransactionEdit> caskTxEdits = createRandomCaskEdits(numEntries);
+    List<TransactionEdit> txEdits = new ArrayList<>();
+    for (co.cask.tephra.persist.TransactionEdit caskTxEdit : caskTxEdits) {
+      txEdits.add(TransactionEdit.convertCaskTxEdit(caskTxEdit));
+    }
+    return txEdits;
+  }
+
+  /**
+   * Generates a number of semi-random {@link co.cask.tephra.persist.TransactionEdit} instances.
    * These are just randomly selected from the possible states, so would not necessarily reflect a real-world
    * distribution.
    *
    * @param numEntries how many entries to generate in the returned list.
    * @return a list of randomly generated transaction log edits.
    */
-  public static List<TransactionEdit> createRandomEdits(int numEntries) {
-    List<TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries);
+  @Deprecated
+  public static List<co.cask.tephra.persist.TransactionEdit> createRandomCaskEdits(int numEntries) {
+    List<co.cask.tephra.persist.TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries);
     for (int i = 0; i < numEntries; i++) {
-      TransactionEdit.State nextType = TransactionEdit.State.values()[random.nextInt(6)];
+      co.cask.tephra.persist.TransactionEdit.State nextType =
+        co.cask.tephra.persist.TransactionEdit.State.values()[random.nextInt(6)];
       long writePointer = Math.abs(random.nextLong());
       switch (nextType) {
         case INPROGRESS:
           edits.add(
-            TransactionEdit.createStarted(writePointer, writePointer - 1,
-                                          System.currentTimeMillis() + 300000L, TransactionType.SHORT));
+            co.cask.tephra.persist.TransactionEdit.createStarted(writePointer, writePointer - 1,
+                                                                 System.currentTimeMillis() + 300000L,
+                                                                 TransactionType.SHORT));
           break;
         case COMMITTING:
-          edits.add(TransactionEdit.createCommitting(writePointer, generateChangeSet(10)));
+          edits.add(co.cask.tephra.persist.TransactionEdit.createCommitting(writePointer, generateChangeSet(10)));
           break;
         case COMMITTED:
-          edits.add(TransactionEdit.createCommitted(writePointer, generateChangeSet(10), writePointer + 1,
-                                                    random.nextBoolean()));
+          edits.add(co.cask.tephra.persist.TransactionEdit.createCommitted(writePointer, generateChangeSet(10),
+                                                                           writePointer + 1, random.nextBoolean()));
           break;
         case INVALID:
-          edits.add(TransactionEdit.createInvalid(writePointer));
+          edits.add(co.cask.tephra.persist.TransactionEdit.createInvalid(writePointer));
           break;
         case ABORTED:
-          edits.add(TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null));
+          edits.add(co.cask.tephra.persist.TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null));
           break;
         case MOVE_WATERMARK:
-          edits.add(TransactionEdit.createMoveWatermark(writePointer));
+          edits.add(co.cask.tephra.persist.TransactionEdit.createMoveWatermark(writePointer));
           break;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/97602a0e/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java
----------------------------------------------------------------------
diff --git a/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java b/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java
index 17c1005..e191f5c 100644
--- a/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java
+++ b/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java
@@ -68,7 +68,7 @@ import java.util.Random;
  * <p>
  *   You can run the BalanceBooks application with the following command:
  *   <pre>
- *     ./bin/tephra run BalanceBooks [num clients] [num iterations]
+ *     ./bin/tephra run org.apache.tephra.examples.BalanceBooks [num clients] [num iterations]
  *   </pre>
  *   where <code>[num clients]</code> is the number of concurrent client threads to use, and
  *   <code>[num iterations]</code> is the number of "transfer" operations to perform per client thread.


[2/2] incubator-tephra git commit: Merge branch 'feature/backward-compat' For backward compatibility with old TransactionLog written with cask package TransactionEdit classes, bump up the TransactionLog version and change the logic in previous Transactio

Posted by go...@apache.org.
Merge branch 'feature/backward-compat'
For backward compatibility with old TransactionLog written with cask package TransactionEdit classes,
bump up the TransactionLog version and change the logic in previous TransactionLog readers to read the old cask package based TransactionEdit class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/3918b8a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/3918b8a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/3918b8a8

Branch: refs/heads/master
Commit: 3918b8a8c1c960ae27c984d14ff7c14ce9a64026
Parents: ab52491 97602a0
Author: Gokul Gunasekaran <go...@apache.org>
Authored: Thu May 12 18:24:25 2016 -0700
Committer: Gokul Gunasekaran <go...@apache.org>
Committed: Thu May 12 18:24:25 2016 -0700

----------------------------------------------------------------------
 .../co/cask/tephra/persist/TransactionEdit.java | 364 +++++++++++++++++++
 .../tephra/persist/TransactionEditCodecs.java   | 315 ++++++++++++++++
 .../java/org/apache/tephra/TxConstants.java     |   2 +-
 .../tephra/persist/AbstractTransactionLog.java  |  41 +++
 .../HDFSTransactionLogReaderSupplier.java       |   3 +
 .../persist/HDFSTransactionLogReaderV1.java     |   5 +-
 .../persist/HDFSTransactionLogReaderV2.java     |   8 +-
 .../persist/HDFSTransactionLogReaderV3.java     | 114 ++++++
 .../apache/tephra/persist/TransactionEdit.java  |  15 +
 .../tephra/persist/HDFSTransactionLogTest.java  | 127 ++++++-
 .../apache/tephra/util/TransactionEditUtil.java |  38 +-
 .../apache/tephra/examples/BalanceBooks.java    |   2 +-
 12 files changed, 996 insertions(+), 38 deletions(-)
----------------------------------------------------------------------