You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2016/05/12 20:35:12 UTC

incubator-tephra git commit: Create a v3 TxLogReader and deprecate v2 which contains logs written with old cask classes

Repository: incubator-tephra
Updated Branches:
  refs/heads/feature/backward-compat c6252e607 -> bc86dae1f


Create a v3 TxLogReader and deprecate v2 which contains logs written with old cask classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/bc86dae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/bc86dae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/bc86dae1

Branch: refs/heads/feature/backward-compat
Commit: bc86dae1fad33582f2b14894faffdbae2885d5ed
Parents: c6252e6
Author: Gokul Gunasekaran <go...@cask.co>
Authored: Thu May 12 13:35:04 2016 -0700
Committer: Gokul Gunasekaran <go...@cask.co>
Committed: Thu May 12 13:35:04 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/tephra/TxConstants.java     |   2 +-
 .../tephra/persist/AbstractTransactionLog.java  |  40 ++++++
 .../HDFSTransactionLogReaderSupplier.java       |   3 +
 .../persist/HDFSTransactionLogReaderV1.java     |   5 +-
 .../persist/HDFSTransactionLogReaderV2.java     |   8 +-
 .../persist/HDFSTransactionLogReaderV3.java     | 114 +++++++++++++++++
 .../apache/tephra/persist/TransactionEdit.java  |  11 ++
 .../tephra/persist/HDFSTransactionLogTest.java  | 127 ++++++++++++++++---
 .../apache/tephra/util/TransactionEditUtil.java |  34 +++--
 9 files changed, 307 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/bc86dae1/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 61ee3cc..1c4fafc 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -345,7 +345,7 @@ public class TxConstants {
      */
     public static final String NUM_ENTRIES_APPENDED = "count";
     public static final String VERSION_KEY = "version";
-    public static final byte CURRENT_VERSION = 2;
+    public static final byte CURRENT_VERSION = 3;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/bc86dae1/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
index b1e0978..93aeef7 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
@@ -18,6 +18,7 @@
 
 package org.apache.tephra.persist;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -248,4 +249,43 @@ public abstract class AbstractTransactionLog implements TransactionLog {
       this.edit.readFields(in);
     }
   }
+
+  // package private for testing
+  @VisibleForTesting
+  static class CaskEntry implements Writable {
+    private LongWritable key;
+    private co.cask.tephra.persist.TransactionEdit edit;
+
+
+    // for Writable
+    public CaskEntry() {
+      this.key = new LongWritable();
+      this.edit = new co.cask.tephra.persist.TransactionEdit();
+    }
+
+    public CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) {
+      this.key = key;
+      this.edit = edit;
+    }
+
+    public LongWritable getKey() {
+      return this.key;
+    }
+
+    public co.cask.tephra.persist.TransactionEdit getEdit() {
+      return this.edit;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      this.key.write(out);
+      this.edit.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.key.readFields(in);
+      this.edit.readFields(in);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/bc86dae1/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
index a517903..1bddc31 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
@@ -45,6 +45,9 @@ public class HDFSTransactionLogReaderSupplier implements Supplier<TransactionLog
     }
 
     switch (version) {
+      case 3:
+        logReader = new HDFSTransactionLogReaderV3(reader);
+        return logReader;
       case 2:
         logReader = new HDFSTransactionLogReaderV2(reader);
         return logReader;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/bc86dae1/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
index faefaec..38b74d8 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
@@ -53,8 +53,9 @@ public class HDFSTransactionLogReaderV1 implements TransactionLogReader {
     }
 
     try {
-      boolean successful = reader.next(key, reuse);
-      return successful ? reuse : null;
+      co.cask.tephra.persist.TransactionEdit oldTxEdit = new co.cask.tephra.persist.TransactionEdit();
+      boolean successful = reader.next(key, oldTxEdit);
+      return successful ? TransactionEdit.convertCaskTxEdit(oldTxEdit) : null;
     } catch (EOFException e) {
       LOG.warn("Hit an unexpected EOF while trying to read the Transaction Edit. Skipping the entry.", e);
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/bc86dae1/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
index ce50da8..e371b98 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
@@ -38,7 +38,7 @@ public class HDFSTransactionLogReaderV2 implements TransactionLogReader {
   private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class);
 
   private final SequenceFile.Reader reader;
-  private final Queue<TransactionEdit> transactionEdits;
+  private final Queue<co.cask.tephra.persist.TransactionEdit> transactionEdits;
   private final CommitMarkerCodec commitMarkerCodec;
   private final LongWritable key;
 
@@ -76,12 +76,12 @@ public class HDFSTransactionLogReaderV2 implements TransactionLogReader {
     }
 
     if (!transactionEdits.isEmpty()) {
-      return transactionEdits.remove();
+      return TransactionEdit.convertCaskTxEdit(transactionEdits.remove());
     }
 
     // Fetch the 'marker' and read 'marker' number of edits
     populateTransactionEdits();
-    return transactionEdits.poll();
+    return TransactionEdit.convertCaskTxEdit(transactionEdits.poll());
   }
 
   private void populateTransactionEdits() throws IOException {
@@ -96,7 +96,7 @@ public class HDFSTransactionLogReaderV2 implements TransactionLogReader {
     }
 
     for (int i = 0; i < numEntries; i++) {
-      TransactionEdit edit = new TransactionEdit();
+      co.cask.tephra.persist.TransactionEdit edit = new co.cask.tephra.persist.TransactionEdit();
       try {
         if (reader.next(key, edit)) {
           transactionEdits.add(edit);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/bc86dae1/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java
new file mode 100644
index 0000000..3670e3f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/**
+ * {@link TransactionLogReader} that can read v3 version of Transaction logs. The logs are expected to
+ * have commit markers that indicates the size of the batch of {@link TransactionEdit}s (follows the marker),
+ * that were synced together. If the expected number of {@link TransactionEdit}s are not present then that set of
+ * {@link TransactionEdit}s are discarded.
+ */
+public class HDFSTransactionLogReaderV3 implements TransactionLogReader {
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class);
+
+  private final SequenceFile.Reader reader;
+  private final Queue<TransactionEdit> transactionEdits;
+  private final CommitMarkerCodec commitMarkerCodec;
+  private final LongWritable key;
+
+  private boolean closed;
+
+  public HDFSTransactionLogReaderV3(SequenceFile.Reader reader) {
+    this.reader = reader;
+    this.transactionEdits = new ArrayDeque<>();
+    this.key = new LongWritable();
+    this.commitMarkerCodec = new CommitMarkerCodec();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    try {
+      commitMarkerCodec.close();
+    } finally {
+      reader.close();
+      closed = true;
+    }
+  }
+
+  @Override
+  public TransactionEdit next() throws IOException {
+    return next(null);
+  }
+
+  @Override
+  public TransactionEdit next(TransactionEdit reuse) throws IOException {
+    if (closed) {
+      return null;
+    }
+
+    if (!transactionEdits.isEmpty()) {
+      return transactionEdits.remove();
+    }
+
+    // Fetch the 'marker' and read 'marker' number of edits
+    populateTransactionEdits();
+    return transactionEdits.poll();
+  }
+
+  private void populateTransactionEdits() throws IOException {
+    // read the marker to determine numEntries to read.
+    int numEntries = 0;
+    try {
+      // can throw EOFException if reading of incomplete commit marker, no other action required since we can safely
+      // ignore this
+      numEntries = commitMarkerCodec.readMarker(reader);
+    } catch (EOFException e) {
+      LOG.warn("Reached EOF in log while trying to read commit marker", e);
+    }
+
+    for (int i = 0; i < numEntries; i++) {
+      TransactionEdit edit = new TransactionEdit();
+      try {
+        if (reader.next(key, edit)) {
+          transactionEdits.add(edit);
+        } else {
+          throw new EOFException("Attempt to read TransactionEdit failed.");
+        }
+      } catch (EOFException e) {
+        // we have reached EOF before reading back numEntries, we clear the partial list and return.
+        LOG.warn("Reached EOF in log before reading {} entries. Ignoring all {} edits since the last marker",
+                 numEntries, transactionEdits.size(), e);
+        transactionEdits.clear();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/bc86dae1/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
index 1d07e72..3555a84 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
@@ -298,6 +298,17 @@ public class TransactionEdit implements Writable {
         parentWritePointer, null);
   }
 
+  public static TransactionEdit convertCaskTxEdit(co.cask.tephra.persist.TransactionEdit txEdit) {
+    if (txEdit == null) {
+      return null;
+    }
+    return new TransactionEdit(txEdit.getWritePointer(), txEdit.getVisibilityUpperBound(),
+                               TransactionEdit.State.valueOf(txEdit.getState().toString()), txEdit.getExpiration(),
+                               txEdit.getChanges(), txEdit.getCommitPointer(), txEdit.getCanCommit(), txEdit.getType(),
+                               txEdit.getTruncateInvalidTx(), txEdit.getTruncateInvalidTxTime(),
+                               txEdit.getParentWritePointer(), txEdit.getCheckpointPointers());
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     TransactionEditCodecs.encode(this, out);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/bc86dae1/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
index 7b9f06b..7a34e55 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
@@ -86,16 +86,26 @@ public class HDFSTransactionLogTest {
   }
 
   private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs,
-                                                    long timeInMillis, boolean withMarker) throws IOException {
+                                                    long timeInMillis, byte versionNumber) throws IOException {
     String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
     Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
     SequenceFile.Metadata metadata = new SequenceFile.Metadata();
-    if (withMarker) {
+    if (versionNumber > 1) {
       metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
-                   new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
+                   new Text(Byte.toString(versionNumber)));
+    }
+
+    switch (versionNumber) {
+      case 1:
+      case 2:
+        return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
+                                         co.cask.tephra.persist.TransactionEdit.class,
+                                         SequenceFile.CompressionType.NONE, null, null, metadata);
+      default:
+        return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
+                                         TransactionEdit.class, SequenceFile.CompressionType.NONE,
+                                         null, null, metadata);
     }
-    return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
-                                     TransactionEdit.class, SequenceFile.CompressionType.NONE, null, null, metadata);
   }
 
   private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception {
@@ -103,33 +113,101 @@ public class HDFSTransactionLogTest {
     CommitMarkerCodec.writeMarker(writer, size);
   }
 
-  private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete)
+  private void testCaskTransactionLogSync(int totalCount, int batchSize, byte versionNumber, boolean isComplete)
     throws Exception {
-    List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
+    List<co.cask.tephra.persist.TransactionEdit> edits = TransactionEditUtil.createRandomCaskEdits(totalCount);
     long timestamp = System.currentTimeMillis();
     Configuration configuration = getConfiguration();
     FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
-    SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, withMarker);
+    SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber);
     AtomicLong logSequence = new AtomicLong();
     HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
-    AbstractTransactionLog.Entry entry;
+    AbstractTransactionLog.CaskEntry entry;
 
     for (int i = 0; i < totalCount - batchSize; i += batchSize) {
-      if (withMarker) {
+      if (versionNumber > 1) {
         writeNumWrites(writer, batchSize);
       }
       for (int j = 0; j < batchSize; j++) {
-        entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
+        entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
         writer.append(entry.getKey(), entry.getEdit());
       }
       writer.syncFs();
     }
 
-    if (withMarker) {
+    if (versionNumber > 1) {
       writeNumWrites(writer, batchSize);
     }
 
     for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
+      entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
+      writer.append(entry.getKey(), entry.getEdit());
+    }
+
+    entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()),
+                                                 edits.get(totalCount - 1));
+    if (isComplete) {
+      writer.append(entry.getKey(), entry.getEdit());
+    } else {
+      byte[] bytes = Longs.toByteArray(entry.getKey().get());
+      writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() {
+        @Override
+        public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
+          byte[] test = new byte[]{0x2};
+          outStream.write(test, 0, 1);
+        }
+
+        @Override
+        public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
+          // no-op
+        }
+
+        @Override
+        public int getSize() {
+          // mimic size longer than the actual byte array size written, so we would reach EOF
+          return 12;
+        }
+      });
+    }
+    writer.syncFs();
+    Closeables.closeQuietly(writer);
+
+    // now let's try to read this log
+    TransactionLogReader reader = transactionLog.getReader();
+    int syncedEdits = 0;
+    while (reader.next() != null) {
+      // testing reading the transaction edits
+      syncedEdits++;
+    }
+    if (isComplete) {
+      Assert.assertEquals(totalCount, syncedEdits);
+    } else {
+      Assert.assertEquals(totalCount - batchSize, syncedEdits);
+    }
+  }
+
+  private void testTransactionLogSync(int totalCount, int batchSize, byte versionNumber, boolean isComplete)
+    throws Exception {
+    List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
+    long timestamp = System.currentTimeMillis();
+    Configuration configuration = getConfiguration();
+    FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
+    SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber);
+    AtomicLong logSequence = new AtomicLong();
+    HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
+    AbstractTransactionLog.Entry entry;
+
+    for (int i = 0; i < totalCount - batchSize; i += batchSize) {
+      writeNumWrites(writer, batchSize);
+      for (int j = 0; j < batchSize; j++) {
+        entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
+        writer.append(entry.getKey(), entry.getEdit());
+      }
+      writer.syncFs();
+    }
+
+    writeNumWrites(writer, batchSize);
+    for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
       entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
       writer.append(entry.getKey(), entry.getEdit());
     }
@@ -177,22 +255,33 @@ public class HDFSTransactionLogTest {
   }
 
   @Test
-  public void testTransactionLogNewVersion() throws Exception {
+  public void testTransactionLogVersion3() throws Exception {
+    // in-complete sync
+    testTransactionLogSync(1000, 1, (byte) 3, false);
+    testTransactionLogSync(2000, 5, (byte) 3, false);
+
+    // complete sync
+    testTransactionLogSync(1000, 1, (byte) 3, true);
+    testTransactionLogSync(2000, 5, (byte) 3, true);
+  }
+
+  @Test
+  public void testTransactionLogVersion2() throws Exception {
     // in-complete sync
-    testTransactionLogSync(1000, 1, true, false);
-    testTransactionLogSync(2000, 5, true, false);
+    testCaskTransactionLogSync(1000, 1, (byte) 2, false);
+    testCaskTransactionLogSync(2000, 5, (byte) 2, false);
 
     // complete sync
-    testTransactionLogSync(1000, 1, true, true);
-    testTransactionLogSync(2000, 5, true, true);
+    testCaskTransactionLogSync(1000, 1, (byte) 2, true);
+    testCaskTransactionLogSync(2000, 5, (byte) 2, true);
   }
 
   @Test
   public void testTransactionLogOldVersion() throws Exception {
     // in-complete sync
-    testTransactionLogSync(1000, 1, false, false);
+    testCaskTransactionLogSync(1000, 1, (byte) 1, false);
 
     // complete sync
-    testTransactionLogSync(2000, 5, false, true);
+    testCaskTransactionLogSync(2000, 5, (byte) 1, true);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/bc86dae1/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
index 854ccdd..c6e897a 100644
--- a/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
+++ b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
@@ -24,6 +24,7 @@ import org.apache.tephra.ChangeId;
 import org.apache.tephra.TransactionType;
 import org.apache.tephra.persist.TransactionEdit;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -34,6 +35,15 @@ import java.util.Set;
 public final class TransactionEditUtil {
   private static Random random = new Random();
 
+  public static List<TransactionEdit> createRandomEdits(int numEntries) {
+    List<co.cask.tephra.persist.TransactionEdit> caskTxEdits = createRandomCaskEdits(numEntries);
+    List<TransactionEdit> txEdits = new ArrayList<>();
+    for (co.cask.tephra.persist.TransactionEdit caskTxEdit : caskTxEdits) {
+      txEdits.add(TransactionEdit.convertCaskTxEdit(caskTxEdit));
+    }
+    return txEdits;
+  }
+
   /**
    * Generates a number of semi-random {@link TransactionEdit} instances.
    * These are just randomly selected from the possible states, so would not necessarily reflect a real-world
@@ -42,32 +52,34 @@ public final class TransactionEditUtil {
    * @param numEntries how many entries to generate in the returned list.
    * @return a list of randomly generated transaction log edits.
    */
-  public static List<TransactionEdit> createRandomEdits(int numEntries) {
-    List<TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries);
+  public static List<co.cask.tephra.persist.TransactionEdit> createRandomCaskEdits(int numEntries) {
+    List<co.cask.tephra.persist.TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries);
     for (int i = 0; i < numEntries; i++) {
-      TransactionEdit.State nextType = TransactionEdit.State.values()[random.nextInt(6)];
+      co.cask.tephra.persist.TransactionEdit.State nextType =
+        co.cask.tephra.persist.TransactionEdit.State.values()[random.nextInt(6)];
       long writePointer = Math.abs(random.nextLong());
       switch (nextType) {
         case INPROGRESS:
           edits.add(
-            TransactionEdit.createStarted(writePointer, writePointer - 1,
-                                          System.currentTimeMillis() + 300000L, TransactionType.SHORT));
+            co.cask.tephra.persist.TransactionEdit.createStarted(writePointer, writePointer - 1,
+                                                                 System.currentTimeMillis() + 300000L,
+                                                                 TransactionType.SHORT));
           break;
         case COMMITTING:
-          edits.add(TransactionEdit.createCommitting(writePointer, generateChangeSet(10)));
+          edits.add(co.cask.tephra.persist.TransactionEdit.createCommitting(writePointer, generateChangeSet(10)));
           break;
         case COMMITTED:
-          edits.add(TransactionEdit.createCommitted(writePointer, generateChangeSet(10), writePointer + 1,
-                                                    random.nextBoolean()));
+          edits.add(co.cask.tephra.persist.TransactionEdit.createCommitted(writePointer, generateChangeSet(10),
+                                                                           writePointer + 1, random.nextBoolean()));
           break;
         case INVALID:
-          edits.add(TransactionEdit.createInvalid(writePointer));
+          edits.add(co.cask.tephra.persist.TransactionEdit.createInvalid(writePointer));
           break;
         case ABORTED:
-          edits.add(TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null));
+          edits.add(co.cask.tephra.persist.TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null));
           break;
         case MOVE_WATERMARK:
-          edits.add(TransactionEdit.createMoveWatermark(writePointer));
+          edits.add(co.cask.tephra.persist.TransactionEdit.createMoveWatermark(writePointer));
           break;
       }
     }