You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@fluo.apache.org by GitBox <gi...@apache.org> on 2018/11/08 15:37:23 UTC

[GitHub] keith-turner closed pull request #1060: Added column type enum

keith-turner closed pull request #1060: Added column type enum
URL: https://github.com/apache/fluo/pull/1060
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
index d87975aa..e202816f 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -21,6 +21,7 @@
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.NotificationUtil;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
@@ -88,34 +89,21 @@ public static String toString(Entry<Key, Value> entry) {
     } else {
       long ts = key.getTimestamp();
       String type = "";
-
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.TX_DONE_PREFIX) {
-        type = "TX_DONE";
-      }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.DEL_LOCK_PREFIX) {
-        type = "DEL_LOCK";
-      }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.LOCK_PREFIX) {
-        type = "LOCK";
-      }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.DATA_PREFIX) {
-        type = "DATA";
+      ColumnType colType = ColumnType.from(ts);
+
+      switch (colType) {
+        case RLOCK:
+          if (ReadLockUtil.isDelete(ts)) {
+            type = "DEL_RLOCK";
+          } else {
+            type = "RLOCK";
+          }
+          ts = ReadLockUtil.decodeTs(ts);
+          break;
+        default:
+          type = colType.toString();
+          break;
       }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.WRITE_PREFIX) {
-        type = "WRITE";
-      }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.ACK_PREFIX) {
-        type = "ACK";
-      }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
-        if (ReadLockUtil.isDelete(ts)) {
-          type = "DEL_RLOCK";
-        } else {
-          type = "RLOCK";
-        }
-        ts = ReadLockUtil.decodeTs(ts);
-      }
-
 
       StringBuilder sb = new StringBuilder();
 
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
index d07f59ee..ad4a8aaa 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
@@ -32,6 +32,7 @@
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
@@ -125,10 +126,10 @@ public void next() throws IOException {
   private boolean consumeData() throws IOException {
     while (source.hasTop()
         && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
-      long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(source.getTopKey());
       long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
-      if (colType == ColumnConstants.DATA_PREFIX) {
+      if (colType == ColumnType.DATA) {
         if (ts >= truncationTime && !rolledback.contains(ts)) {
           return false;
         }
@@ -173,132 +174,147 @@ private void readColMetadata() throws IOException {
       return;
     }
 
-    while (source.hasTop()
+    loop: while (source.hasTop()
         && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
 
-      long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(source.getTopKey());
       long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
-      if (colType == ColumnConstants.TX_DONE_PREFIX) {
-        keys.add(source.getTopKey(), source.getTopValue());
-        completeTxs.add(ts);
-      } else if (colType == ColumnConstants.WRITE_PREFIX) {
-        boolean keep = false;
-        boolean complete = completeTxs.contains(ts);
-        byte[] val = source.getTopValue().get();
-        long timePtr = WriteValue.getTimestamp(val);
-
-        if (WriteValue.isPrimary(val) && !complete) {
-          keep = true;
+      switch (colType) {
+        case TX_DONE: {
+          keys.add(source.getTopKey(), source.getTopValue());
+          completeTxs.add(ts);
+          break;
         }
+        case WRITE: {
+          boolean keep = false;
+          boolean complete = completeTxs.contains(ts);
+          byte[] val = source.getTopValue().get();
+          long timePtr = WriteValue.getTimestamp(val);
 
-        if (!oldestSeen) {
-          if (firstWrite == -1) {
-            firstWrite = ts;
+          if (WriteValue.isPrimary(val) && !complete) {
+            keep = true;
           }
 
-          if (ts < gcTimestamp) {
-            oldestSeen = true;
-            truncationTime = timePtr;
-            if (!(WriteValue.isDelete(val) && isFullMajc)) {
+          if (!oldestSeen) {
+            if (firstWrite == -1) {
+              firstWrite = ts;
+            }
+
+            if (ts < gcTimestamp) {
+              oldestSeen = true;
+              truncationTime = timePtr;
+              if (!(WriteValue.isDelete(val) && isFullMajc)) {
+                keep = true;
+              }
+            } else {
               keep = true;
             }
-          } else {
-            keep = true;
           }
-        }
 
-        if (timePtr > invalidationTime) {
-          invalidationTime = timePtr;
-        }
+          if (timePtr > invalidationTime) {
+            invalidationTime = timePtr;
+          }
 
-        if (keep) {
-          keys.add(source.getTopKey(), val);
-        } else if (complete) {
-          completeTxs.remove(ts);
+          if (keep) {
+            keys.add(source.getTopKey(), val);
+          } else if (complete) {
+            completeTxs.remove(ts);
+          }
+          break;
         }
-      } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
-        boolean keep = false;
-        long txDoneTs = DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
-        boolean complete = completeTxs.contains(txDoneTs);
+        case DEL_LOCK: {
+          boolean keep = false;
+          long txDoneTs = DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
+          boolean complete = completeTxs.contains(txDoneTs);
 
-        byte[] val = source.getTopValue().get();
+          byte[] val = source.getTopValue().get();
 
-        if (!complete && DelLockValue.isPrimary(val)) {
-          keep = true;
-        }
+          if (!complete && DelLockValue.isPrimary(val)) {
+            keep = true;
+          }
 
-        if (DelLockValue.isRollback(val)) {
-          rolledback.add(ts);
-          keep |= !isFullMajc;
-        }
+          if (DelLockValue.isRollback(val)) {
+            rolledback.add(ts);
+            keep |= !isFullMajc;
+          }
 
-        if (ts > invalidationTime) {
-          invalidationTime = ts;
-        }
+          if (ts > invalidationTime) {
+            invalidationTime = ts;
+          }
 
-        if (keep) {
-          keys.add(source.getTopKey(), source.getTopValue());
-        } else if (complete) {
-          completeTxs.remove(txDoneTs);
+          if (keep) {
+            keys.add(source.getTopKey(), source.getTopValue());
+          } else if (complete) {
+            completeTxs.remove(txDoneTs);
+          }
+          break;
         }
-      } else if (colType == ColumnConstants.RLOCK_PREFIX) {
-        boolean keep = false;
-        long rlts = ReadLockUtil.decodeTs(ts);
-        boolean isDelete = ReadLockUtil.isDelete(ts);
+        case RLOCK: {
+          boolean keep = false;
+          long rlts = ReadLockUtil.decodeTs(ts);
+          boolean isDelete = ReadLockUtil.isDelete(ts);
 
-        if (isDelete) {
-          lastReadLockDeleteTs = rlts;
-        }
+          if (isDelete) {
+            lastReadLockDeleteTs = rlts;
+          }
 
-        if (rlts > invalidationTime) {
-          if (isFullMajc) {
-            if (isDelete) {
-              if (DelReadLockValue.isRollback(source.getTopValue().get())) {
-                // can drop rolled back read lock delete markers on any full majc, do not need to
-                // consider gcTimestamp
-                keep = false;
+          if (rlts > invalidationTime) {
+            if (isFullMajc) {
+              if (isDelete) {
+                if (DelReadLockValue.isRollback(source.getTopValue().get())) {
+                  // can drop rolled back read lock delete markers on any full majc, do not need to
+                  // consider gcTimestamp
+                  keep = false;
+                } else {
+                  long rlockCommitTs =
+                      DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
+                  keep = rlockCommitTs >= gcTimestamp;
+                }
               } else {
-                long rlockCommitTs =
-                    DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
-                keep = rlockCommitTs >= gcTimestamp;
+                keep = lastReadLockDeleteTs != rlts;
               }
             } else {
-              keep = lastReadLockDeleteTs != rlts;
+              // can drop deleted read lock entries.. keep the delete entry.
+              keep = isDelete || lastReadLockDeleteTs != rlts;
             }
-          } else {
-            // can drop deleted read lock entries.. keep the delete entry.
-            keep = isDelete || lastReadLockDeleteTs != rlts;
           }
-        }
 
-        if (keep) {
-          keys.add(source.getTopKey(), source.getTopValue());
-        }
-      } else if (colType == ColumnConstants.LOCK_PREFIX) {
-        if (ts > invalidationTime) {
-          keys.add(source.getTopKey(), source.getTopValue());
+          if (keep) {
+            keys.add(source.getTopKey(), source.getTopValue());
+          }
+          break;
         }
-      } else if (colType == ColumnConstants.DATA_PREFIX) {
-        // can stop looking
-        break;
-      } else if (colType == ColumnConstants.ACK_PREFIX) {
-        if (!sawAck) {
-          if (ts >= firstWrite) {
+        case LOCK: {
+          if (ts > invalidationTime) {
             keys.add(source.getTopKey(), source.getTopValue());
           }
-          sawAck = true;
+          break;
         }
-      } else {
-        throw new IllegalArgumentException(" unknown colType " + String.format("%x", colType));
+        case DATA: {
+          // can stop looking
+          break loop;
+        }
+        case ACK: {
+          if (!sawAck) {
+            if (ts >= firstWrite) {
+              keys.add(source.getTopKey(), source.getTopValue());
+            }
+            sawAck = true;
+          }
+          break;
+        }
+
+        default:
+          throw new IllegalArgumentException(" unknown colType " + colType);
+
       }
 
       source.next();
     }
 
     keys.copyTo(keysFiltered, (timestamp -> {
-      long colType = timestamp & ColumnConstants.PREFIX_MASK;
-      if (colType == ColumnConstants.TX_DONE_PREFIX) {
+      if (ColumnType.from(timestamp) == ColumnType.TX_DONE) {
         return completeTxs.contains(timestamp & ColumnConstants.TIMESTAMP_MASK);
       } else {
         return true;
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
index dd4de54b..167eff71 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -26,17 +26,10 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 
-import static org.apache.fluo.accumulo.util.ColumnConstants.ACK_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.DATA_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.DEL_LOCK_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.LOCK_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX;
 import static org.apache.fluo.accumulo.util.ColumnConstants.TIMESTAMP_MASK;
-import static org.apache.fluo.accumulo.util.ColumnConstants.TX_DONE_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.WRITE_PREFIX;
 
 public class OpenReadLockIterator implements SortedKeyValueIterator<Key, Value> {
 
@@ -47,35 +40,43 @@
   private void findTop() throws IOException {
     while (source.hasTop()) {
 
-      long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(source.getTopKey());
 
-      if (colType == TX_DONE_PREFIX || colType == WRITE_PREFIX || colType == DEL_LOCK_PREFIX) {
-        source.skipToPrefix(source.getTopKey(), RLOCK_PREFIX);
-        continue;
-      } else if (colType == RLOCK_PREFIX) {
-        if (ReadLockUtil.isDelete(source.getTopKey())) {
-          lastDelete.set(source.getTopKey());
-        } else {
-          if (lastDelete.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
-            long ts1 = ReadLockUtil.decodeTs(source.getTopKey().getTimestamp() & TIMESTAMP_MASK);
-            long ts2 = ReadLockUtil.decodeTs(lastDelete.getTimestamp() & TIMESTAMP_MASK);
-
-            if (ts1 != ts2) {
+      switch (colType) {
+        case TX_DONE:
+        case WRITE:
+        case DEL_LOCK: {
+          source.skipToPrefix(source.getTopKey(), ColumnType.RLOCK);
+          break;
+        }
+        case RLOCK: {
+          if (ReadLockUtil.isDelete(source.getTopKey())) {
+            lastDelete.set(source.getTopKey());
+          } else {
+            if (lastDelete.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+              long ts1 = ReadLockUtil.decodeTs(source.getTopKey().getTimestamp() & TIMESTAMP_MASK);
+              long ts2 = ReadLockUtil.decodeTs(lastDelete.getTimestamp() & TIMESTAMP_MASK);
+
+              if (ts1 != ts2) {
+                // found a read lock that is not suppressed by a delete read lock entry
+                return;
+              }
+            } else {
               // found a read lock that is not suppressed by a delete read lock entry
               return;
             }
-          } else {
-            // found a read lock that is not suppressed by a delete read lock entry
-            return;
           }
+          source.next();
+          break;
+        }
+        case DATA:
+        case LOCK:
+        case ACK: {
+          source.skipColumn(source.getTopKey());
+          break;
         }
-        source.next();
-        continue;
-      } else if (colType == DATA_PREFIX || colType == LOCK_PREFIX || colType == ACK_PREFIX) {
-        source.skipColumn(source.getTopKey());
-        continue;
-      } else {
-        throw new IllegalArgumentException("Unknown column type " + source.getTopKey());
+        default:
+          throw new IllegalArgumentException("Unknown column type " + source.getTopKey());
       }
     }
   }
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
index b6f9a485..f6de3f84 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -28,6 +28,7 @@
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelReadLockValue;
 import org.apache.fluo.accumulo.values.WriteValue;
@@ -102,9 +103,9 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
 
     Key endKey = new Key(range.getStartKey());
     if (checkAck) {
-      endKey.setTimestamp(ColumnConstants.DATA_PREFIX | ColumnConstants.TIMESTAMP_MASK);
+      endKey.setTimestamp(ColumnType.DATA.first());
     } else {
-      endKey.setTimestamp(ColumnConstants.ACK_PREFIX | ColumnConstants.TIMESTAMP_MASK);
+      endKey.setTimestamp(ColumnType.ACK.first());
     }
 
     // Tried seeking directly to WRITE_PREFIX, however this did not work well because of how
@@ -120,114 +121,126 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
     while (source.hasTop() && seekRange.getStartKey().equals(source.getTopKey(),
         PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
 
-      long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(source.getTopKey());
       long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
-      if (colType == ColumnConstants.TX_DONE_PREFIX) {
-        // tried to make 1st seek go to WRITE_PREFIX, but this did not allow the DeleteIterator to
-        // be removed from the stack so it was slower.
-        source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.WRITE_PREFIX);
-      } else if (colType == ColumnConstants.WRITE_PREFIX) {
-        long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
-
-        if (timePtr > invalidationTime) {
-          invalidationTime = timePtr;
+      switch (colType) {
+        case TX_DONE: {
+          // tried to make 1st seek go to WRITE_PREFIX, but this did not allow the DeleteIterator to
+          // be removed from the stack so it was slower.
+          source.skipToPrefix(seekRange.getStartKey(), ColumnType.WRITE);
+          break;
         }
+        case WRITE: {
+          long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
 
-        if (ts >= snaptime) {
-          hasTop = true;
-          return;
-        }
-
-        source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.DEL_LOCK_PREFIX);
-      } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
-        if (ts > invalidationTime) {
-          invalidationTime = ts;
+          if (timePtr > invalidationTime) {
+            invalidationTime = timePtr;
+          }
 
           if (ts >= snaptime) {
             hasTop = true;
             return;
           }
-        }
 
-        if (readlock) {
-          source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.LOCK_PREFIX);
-        } else {
-          source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.RLOCK_PREFIX);
+          source.skipToPrefix(seekRange.getStartKey(), ColumnType.DEL_LOCK);
+          break;
         }
-      } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+        case DEL_LOCK: {
+          if (ts > invalidationTime) {
+            invalidationTime = ts;
 
-        long lastDeleteTs = -1;
-        long rlts = ReadLockUtil.decodeTs(ts);
+            if (ts >= snaptime) {
+              hasTop = true;
+              return;
+            }
+          }
 
-        if (!readlock) {
-          while (rlts > invalidationTime && colType == ColumnConstants.RLOCK_PREFIX) {
-            if (ReadLockUtil.isDelete(ts)) {
-              // ignore rolled back read locks, these should never prevent a write lock
-              if (!DelReadLockValue.isRollback(source.getTopValue().get())) {
-                if (rlts >= snaptime) {
-                  hasTop = true;
-                  return;
-                } else {
-                  long rlockCommitTs =
-                      DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
-                  if (rlockCommitTs > snaptime) {
+          if (readlock) {
+            source.skipToPrefix(seekRange.getStartKey(), ColumnType.LOCK);
+          } else {
+            source.skipToPrefix(seekRange.getStartKey(), ColumnType.RLOCK);
+          }
+          break;
+        }
+        case RLOCK: {
+          long lastDeleteTs = -1;
+          long rlts = ReadLockUtil.decodeTs(ts);
+
+          if (!readlock) {
+            while (rlts > invalidationTime && colType == ColumnType.RLOCK) {
+              if (ReadLockUtil.isDelete(ts)) {
+                // ignore rolled back read locks, these should never prevent a write lock
+                if (!DelReadLockValue.isRollback(source.getTopValue().get())) {
+                  if (rlts >= snaptime) {
                     hasTop = true;
                     return;
+                  } else {
+                    long rlockCommitTs =
+                        DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
+                    if (rlockCommitTs > snaptime) {
+                      hasTop = true;
+                      return;
+                    }
                   }
                 }
-              }
 
 
-              lastDeleteTs = rlts;
-            } else {
-              if (rlts != lastDeleteTs) {
-                // this read lock is active
-                hasTop = true;
-                return;
+                lastDeleteTs = rlts;
+              } else {
+                if (rlts != lastDeleteTs) {
+                  // this read lock is active
+                  hasTop = true;
+                  return;
+                }
+              }
+
+              source.next();
+              if (source.hasTop()) {
+                colType = ColumnType.from(source.getTopKey());
+                ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
+                rlts = ReadLockUtil.decodeTs(ts);
+              } else {
+                break;
               }
             }
+          }
 
-            source.next();
-            if (source.hasTop()) {
-              colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
-              ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
-              rlts = ReadLockUtil.decodeTs(ts);
-            } else {
-              break;
-            }
+          if (source.hasTop() && (colType == ColumnType.RLOCK)) {
+            source.skipToPrefix(seekRange.getStartKey(), ColumnType.LOCK);
           }
+          break;
         }
+        case LOCK: {
+          if (ts > invalidationTime) {
+            // nothing supersedes this lock, therefore the column is locked
+            hasTop = true;
+            return;
+          }
 
-        if (source.hasTop() && (colType == ColumnConstants.RLOCK_PREFIX)) {
-          source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.LOCK_PREFIX);
-        }
-      } else if (colType == ColumnConstants.LOCK_PREFIX) {
-        if (ts > invalidationTime) {
-          // nothing supersedes this lock, therefore the column is locked
-          hasTop = true;
-          return;
+          if (checkAck) {
+            source.skipToPrefix(seekRange.getStartKey(), ColumnType.ACK);
+          } else {
+            // only ack and data left and not interested in either so stop looking
+            return;
+          }
+          break;
         }
-
-        if (checkAck) {
-          source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.ACK_PREFIX);
-        } else {
-          // only ack and data left and not interested in either so stop looking
+        case DATA: {
+          // can stop looking
           return;
         }
-      } else if (colType == ColumnConstants.DATA_PREFIX) {
-        // can stop looking
-        return;
-      } else if (colType == ColumnConstants.ACK_PREFIX) {
-        if (checkAck && ts > ntfyTimestamp) {
-          hasTop = true;
-          return;
-        } else {
-          // nothing else to look at in this column
-          return;
+        case ACK: {
+          if (checkAck && ts > ntfyTimestamp) {
+            hasTop = true;
+            return;
+          } else {
+            // nothing else to look at in this column
+            return;
+          }
         }
-      } else {
-        throw new IllegalArgumentException();
+        default:
+          throw new IllegalArgumentException();
       }
     }
   }
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
index cae383f8..7df373f5 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -29,6 +29,7 @@
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.values.WriteValue;
 
 public class RollbackCheckIterator implements SortedKeyValueIterator<Key, Value> {
@@ -91,60 +92,69 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
     hasTop = false;
     while (source.hasTop()
         && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
-      long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(source.getTopKey());
       long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
-      if (colType == ColumnConstants.TX_DONE_PREFIX) {
-        source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX);
-        continue;
-      } else if (colType == ColumnConstants.WRITE_PREFIX) {
-        long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
-
-        if (timePtr > invalidationTime) {
-          invalidationTime = timePtr;
+      switch (colType) {
+        case TX_DONE:
+          source.skipToPrefix(curCol, ColumnType.WRITE);
+          continue;
+        case WRITE: {
+          long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
+
+          if (timePtr > invalidationTime) {
+            invalidationTime = timePtr;
+          }
+
+          if (lockTime == timePtr) {
+            hasTop = true;
+            return;
+          }
+
+          if (lockTime > timePtr) {
+            source.skipToPrefix(curCol, ColumnType.DEL_LOCK);
+            continue;
+          }
+          break;
         }
-
-        if (lockTime == timePtr) {
-          hasTop = true;
-          return;
+        case DEL_LOCK: {
+          if (ts > invalidationTime) {
+            invalidationTime = ts;
+          }
+
+          if (ts == lockTime) {
+            hasTop = true;
+            return;
+          }
+
+          if (lockTime > ts) {
+            source.skipToPrefix(curCol, ColumnType.LOCK);
+            continue;
+          }
+          break;
         }
-
-        if (lockTime > timePtr) {
-          source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX);
+        case RLOCK: {
+          source.skipToPrefix(curCol, ColumnType.LOCK);
           continue;
         }
-
-      } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
-        if (ts > invalidationTime) {
-          invalidationTime = ts;
+        case LOCK: {
+          if (ts > invalidationTime) {
+            // nothing supersedes this lock, therefore the column is locked
+            hasTop = true;
+            return;
+          }
+          break;
         }
-
-        if (ts == lockTime) {
-          hasTop = true;
+        case DATA: {
+          // can stop looking
           return;
         }
-
-        if (lockTime > ts) {
-          source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
-          continue;
-        }
-
-      } else if (colType == ColumnConstants.RLOCK_PREFIX) {
-        source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
-        continue;
-      } else if (colType == ColumnConstants.LOCK_PREFIX) {
-        if (ts > invalidationTime) {
-          // nothing supersedes this lock, therefore the column is locked
-          hasTop = true;
-          return;
+        case ACK: {
+          // do nothing if ACK
+          break;
         }
-      } else if (colType == ColumnConstants.DATA_PREFIX) {
-        // can stop looking
-        return;
-      } else if (colType == ColumnConstants.ACK_PREFIX) {
-        // do nothing if ACK
-      } else {
-        throw new IllegalArgumentException();
+        default:
+          throw new IllegalArgumentException();
       }
 
       source.next();
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
index 27e63ef4..4b5ab9d0 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -33,6 +33,7 @@
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.values.WriteValue;
 
 public class SnapshotIterator implements SortedKeyValueIterator<Key, Value> {
@@ -91,87 +92,95 @@ private void findTop() throws IOException {
 
       while (source.hasTop()
           && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
-        long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+        ColumnType colType = ColumnType.from(source.getTopKey());
         long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
-        if (colType == ColumnConstants.TX_DONE_PREFIX) {
-          source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX);
-          continue;
-        } else if (colType == ColumnConstants.WRITE_PREFIX) {
-          long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
-
-          if (timePtr > invalidationTime) {
-            invalidationTime = timePtr;
+        switch (colType) {
+          case TX_DONE: {
+            source.skipToPrefix(curCol, ColumnType.WRITE);
+            continue;
           }
+          case WRITE: {
+            long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
 
-          if (dataPointer == -1) {
-            if (ts <= snaptime) {
-              dataPointer = timePtr;
-              source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX);
-              continue;
+            if (timePtr > invalidationTime) {
+              invalidationTime = timePtr;
+            }
+
+            if (dataPointer == -1) {
+              if (ts <= snaptime) {
+                dataPointer = timePtr;
+                source.skipToPrefix(curCol, ColumnType.DEL_LOCK);
+                continue;
+              } else {
+                source.skipToTimestamp(curCol, ColumnType.WRITE.enode(snaptime));
+                continue;
+              }
+            }
+            break;
+          }
+          case DEL_LOCK: {
+            if (ts > invalidationTime) {
+              invalidationTime = ts;
+            }
+            if (returnReadLockPresent) {
+              source.skipToPrefix(curCol, ColumnType.RLOCK);
             } else {
-              source.skipToTimestamp(curCol, ColumnConstants.WRITE_PREFIX | snaptime);
-              continue;
+              source.skipToPrefix(curCol, ColumnType.LOCK);
             }
+            continue;
           }
-        } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
-          if (ts > invalidationTime) {
-            invalidationTime = ts;
+          case RLOCK: {
+            if (returnReadLockPresent) {
+              rememberReadLock(source.getTopKey(), source.getTopValue());
+            }
+
+            source.skipToPrefix(curCol, ColumnType.LOCK);
+            continue;
           }
-          if (returnReadLockPresent) {
-            source.skipToPrefix(curCol, ColumnConstants.RLOCK_PREFIX);
-          } else {
-            source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
+          case LOCK: {
+            if (ts > invalidationTime && ts <= snaptime) {
+              // nothing supersedes this lock, therefore the column is locked
+              return;
+            } else {
+              if (dataPointer == -1) {
+                source.skipColumn(curCol);
+                continue outer;
+              } else {
+                source.skipToTimestamp(curCol, ColumnType.DATA.enode(dataPointer));
+                continue;
+              }
+            }
           }
-          continue;
+          case DATA: {
+            if (dataPointer == ts) {
+              // found data for this column
+              return;
+            }
 
-        } else if (colType == ColumnConstants.RLOCK_PREFIX) {
-          if (returnReadLockPresent) {
-            rememberReadLock(source.getTopKey(), source.getTopValue());
-          }
+            if (ts < dataPointer || dataPointer == -1) {
+              source.skipColumn(curCol);
+              continue outer;
+            }
 
-          source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
-          continue;
-        } else if (colType == ColumnConstants.LOCK_PREFIX) {
-          if (ts > invalidationTime && ts <= snaptime) {
-            // nothing supersedes this lock, therefore the column is locked
-            return;
-          } else {
+            if (ts > dataPointer) {
+              source.skipToTimestamp(curCol, ColumnType.DATA.enode(dataPointer));
+              continue;
+            }
+            break;
+          }
+          case ACK: {
             if (dataPointer == -1) {
               source.skipColumn(curCol);
               continue outer;
             } else {
-              source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | dataPointer);
+              source.skipToTimestamp(curCol, ColumnType.DATA.enode(dataPointer));
               continue;
             }
           }
-        } else if (colType == ColumnConstants.DATA_PREFIX) {
-          if (dataPointer == ts) {
-            // found data for this column
-            return;
-          }
-
-          if (ts < dataPointer || dataPointer == -1) {
-            source.skipColumn(curCol);
-            continue outer;
-          }
-
-          if (ts > dataPointer) {
-            source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | dataPointer);
-            continue;
-          }
-        } else if (colType == ColumnConstants.ACK_PREFIX) {
-          if (dataPointer == -1) {
-            source.skipColumn(curCol);
-            continue outer;
-          } else {
-            source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | dataPointer);
-            continue;
-          }
-        } else {
-          throw new IllegalArgumentException();
+          default:
+            throw new IllegalArgumentException();
         }
-
         // TODO handle case where dataPointer >=0, but no data was found
         source.next();
       }
@@ -220,8 +229,7 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
     if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE
         && !range.isStartKeyInclusive()) {
 
-      if ((range.getStartKey().getTimestamp()
-          & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+      if (ColumnType.from(range.getStartKey()) == ColumnType.RLOCK) {
         Key currCol = new Key(range.getStartKey());
         currCol.setTimestamp(Long.MAX_VALUE);
         newRange = new Range(currCol, true, range.getEndKey(), range.isEndKeyInclusive());
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
index 3279ec2f..534c82da 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
@@ -26,7 +26,7 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 
 /**
  * The purpose of this iterator is to make seeking within a columns timestamp range efficient.
@@ -80,10 +80,8 @@ public void skipToTimestamp(Key curCol, long timestamp) throws IOException {
     }
   }
 
-  public void skipToPrefix(Key curCol, long prefix) throws IOException {
-    // first possible timestamp in sorted order for this prefix
-    long timestamp = prefix | ColumnConstants.TIMESTAMP_MASK;
-    skipToTimestamp(curCol, timestamp);
+  public void skipToPrefix(Key curCol, ColumnType colType) throws IOException {
+    skipToTimestamp(curCol, colType.first());
   }
 
   public void skipColumn(Key curCol) throws IOException {
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
index 7063adcd..16b9f1ee 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
@@ -21,20 +21,11 @@
  * Constants used extract data from columns
  */
 public class ColumnConstants {
-
-  public static final long PREFIX_MASK = 0xe000000000000000L;
-  public static final long TX_DONE_PREFIX = 0x6000000000000000L;
-  public static final long WRITE_PREFIX = 0x4000000000000000L;
-  public static final long DEL_LOCK_PREFIX = 0x2000000000000000L;
-  public static final long RLOCK_PREFIX = 0x0000000000000000L;
-  public static final long LOCK_PREFIX = 0xe000000000000000L;
-  public static final long ACK_PREFIX = 0xc000000000000000L;
-  public static final long DATA_PREFIX = 0xa000000000000000L;
-  public static final long TIMESTAMP_MASK = 0x1fffffffffffffffL;
+  public static final long PREFIX_MASK = -1L << (64 - ColumnType.BITS);
+  public static final long TIMESTAMP_MASK = -1L >>> ColumnType.BITS;
   public static final Bytes NOTIFY_CF = Bytes.of("ntfy");
   public static final String NOTIFY_LOCALITY_GROUP_NAME = "notify";
   public static final Bytes GC_CF = Bytes.of("gc");
 
   private ColumnConstants() {}
-
 }
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java
new file mode 100644
index 00000000..b16fe6dc
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.accumulo.core.data.Key;
+
+/**
+ * Abstracts how the Fluo column type is encoded in Accumulo timestamps.
+ */
+public enum ColumnType {
+
+  TX_DONE, WRITE, DEL_LOCK, RLOCK, LOCK, ACK, DATA;
+
+  private long prefix;
+
+  /**
+   * @return The first possible timestamp in sorted order.
+   */
+  public long first() {
+    return prefix | ColumnConstants.TIMESTAMP_MASK;
+  }
+
+  /**
+   * @return The timestamp with this column type encoded into the high order bits.
+   */
+  public long enode(long timestamp) {
+    Preconditions.checkArgument((timestamp >>> (64 - BITS)) == 0);
+    return prefix | timestamp;
+  }
+
+  // The number of leftmost bits in in the timestamp reserved for encoding the column type
+  static final int BITS = 3;
+  private static final byte TX_DONE_PREFIX = 0x03;
+  private static final byte WRITE_PREFIX = 0x02;
+  private static final byte DEL_LOCK_PREFIX = 0x01;
+  private static final byte RLOCK_PREFIX = 0x00;
+  private static final byte LOCK_PREFIX = 0x07;
+  private static final byte ACK_PREFIX = 0x06;
+  private static final byte DATA_PREFIX = 0x05;
+
+  static {
+    TX_DONE.prefix = (long) TX_DONE_PREFIX << (64 - BITS);
+    WRITE.prefix = (long) WRITE_PREFIX << (64 - BITS);
+    DEL_LOCK.prefix = (long) DEL_LOCK_PREFIX << (64 - BITS);
+    RLOCK.prefix = (long) RLOCK_PREFIX << (64 - BITS);
+    LOCK.prefix = (long) LOCK_PREFIX << (64 - BITS);
+    ACK.prefix = (long) ACK_PREFIX << (64 - BITS);
+    DATA.prefix = (long) DATA_PREFIX << (64 - BITS);
+  }
+
+  public static ColumnType from(Key k) {
+    return from(k.getTimestamp());
+  }
+
+  public static ColumnType from(long timestamp) {
+    byte prefix = (byte) (timestamp >>> (64 - BITS));
+    switch (prefix) {
+      case TX_DONE_PREFIX:
+        return TX_DONE;
+      case WRITE_PREFIX:
+        return WRITE;
+      case DEL_LOCK_PREFIX:
+        return DEL_LOCK;
+      case RLOCK_PREFIX:
+        return RLOCK;
+      case LOCK_PREFIX:
+        return LOCK;
+      case ACK_PREFIX:
+        return ACK;
+      case DATA_PREFIX:
+        return DATA;
+      default:
+        throw new IllegalArgumentException("Unknown prefix : " + Integer.toHexString(prefix));
+    }
+  }
+}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
index 417fd810..7358e1ee 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -26,7 +26,7 @@
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.SortedMapIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -320,6 +320,6 @@ public void testNegativeTime() {
 
   @Test(expected = IllegalArgumentException.class)
   public void testNonZeroPrefix() {
-    SnapshotIterator.setSnaptime(null, ColumnConstants.DATA_PREFIX | 6);
+    SnapshotIterator.setSnaptime(null, ColumnType.DATA.enode(6));
   }
 }
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
index dbdcd1b6..ad595e06 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -29,7 +29,7 @@
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.format.FluoFormatter;
 import org.apache.fluo.accumulo.iterators.CountingIterator.Counter;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -101,28 +101,28 @@ public TestData addIfInRange(String key, String value, Range range) {
 
     switch (ct) {
       case "ACK":
-        ts |= ColumnConstants.ACK_PREFIX;
+        ts = ColumnType.ACK.enode(ts);
         break;
       case "TX_DONE":
-        ts |= ColumnConstants.TX_DONE_PREFIX;
+        ts = ColumnType.TX_DONE.enode(ts);
         break;
       case "WRITE":
-        ts |= ColumnConstants.WRITE_PREFIX;
+        ts = ColumnType.WRITE.enode(ts);
         long writeTs = Long.parseLong(value.split("\\s+")[0]);
         val = WriteValue.encode(writeTs, value.contains("PRIMARY"), value.contains("DELETE"));
         break;
       case "LOCK":
-        ts |= ColumnConstants.LOCK_PREFIX;
+        ts = ColumnType.LOCK.enode(ts);
         String rc[] = value.split("\\s+");
         val = LockValue.encode(Bytes.of(rc[0]), new Column(rc[1], rc[2]), value.contains("WRITE"),
             value.contains("DELETE"), value.contains("TRIGGER"), 42l);
         break;
       case "DATA":
-        ts |= ColumnConstants.DATA_PREFIX;
+        ts = ColumnType.DATA.enode(ts);
         val = value.getBytes();
         break;
       case "DEL_LOCK":
-        ts |= ColumnConstants.DEL_LOCK_PREFIX;
+        ts = ColumnType.DEL_LOCK.enode(ts);
         if (value.contains("ROLLBACK") || value.contains("ABORT")) {
           val = DelLockValue.encodeRollback(value.contains("PRIMARY"), true);
         } else {
@@ -132,11 +132,11 @@ public TestData addIfInRange(String key, String value, Range range) {
         break;
       case "RLOCK":
         ts = ReadLockUtil.encodeTs(ts, false);
-        ts |= ColumnConstants.RLOCK_PREFIX;
+        ts = ColumnType.RLOCK.enode(ts);
         break;
       case "DEL_RLOCK":
         ts = ReadLockUtil.encodeTs(ts, true);
-        ts |= ColumnConstants.RLOCK_PREFIX;
+        ts = ColumnType.RLOCK.enode(ts);
 
         if (value.contains("ROLLBACK") || value.contains("ABORT")) {
           val = DelReadLockValue.encodeRollback();
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java
new file mode 100644
index 00000000..3b0b09bb
--- /dev/null
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.util;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import org.apache.accumulo.core.data.Key;
+import org.junit.Test;
+
+import static org.apache.fluo.accumulo.util.ColumnType.ACK;
+import static org.apache.fluo.accumulo.util.ColumnType.DATA;
+import static org.apache.fluo.accumulo.util.ColumnType.DEL_LOCK;
+import static org.apache.fluo.accumulo.util.ColumnType.LOCK;
+import static org.apache.fluo.accumulo.util.ColumnType.RLOCK;
+import static org.apache.fluo.accumulo.util.ColumnType.TX_DONE;
+import static org.apache.fluo.accumulo.util.ColumnType.WRITE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnTypeTest {
+
+  private static final long TIMESTAMP_MASK = 0x1fffffffffffffffL;
+
+
+  // map of expected prefixes for a column type
+  private static final Map<Long, ColumnType> EPM;
+
+  static {
+    Builder<Long, ColumnType> builder = ImmutableMap.builder();
+    builder.put(0x6000000000000000L, TX_DONE);
+    builder.put(0x4000000000000000L, WRITE);
+    builder.put(0x2000000000000000L, DEL_LOCK);
+    builder.put(0x0000000000000000L, RLOCK);
+    builder.put(0xe000000000000000L, LOCK);
+    builder.put(0xc000000000000000L, ACK);
+    builder.put(0xa000000000000000L, DATA);
+    EPM = builder.build();
+  }
+
+  @Test
+  public void testPrefix() {
+    for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L << 48}) {
+      EPM.forEach((prefix, colType) -> assertEquals(prefix | l, colType.enode(l)));
+    }
+  }
+
+  @Test
+  public void testFirst() {
+    EPM.forEach((prefix, colType) -> assertEquals(prefix | TIMESTAMP_MASK, colType.first()));
+    for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L << 48}) {
+      EPM.forEach((prefix, colType) -> {
+        Key k1 = new Key("r", "f", "q");
+        k1.setTimestamp(prefix | l);
+        Key k2 = new Key("r", "f", "q");
+        k2.setTimestamp(colType.first());
+        assertTrue(k1.compareTo(k2) > 0);
+      });
+    }
+  }
+
+  @Test
+  public void testFrom() {
+    for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L << 48}) {
+      EPM.forEach((prefix, colType) -> {
+        assertEquals(ColumnType.from(prefix | l), colType);
+        Key k = new Key("r", "f", "q");
+        k.setTimestamp(prefix | l);
+        assertEquals(ColumnType.from(k), colType);
+      });
+    }
+  }
+
+  @Test
+  public void testCoverage() {
+    EnumSet<ColumnType> expected = EnumSet.allOf(ColumnType.class);
+    HashSet<ColumnType> actual = new HashSet<>(EPM.values());
+    assertEquals(expected, actual);
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index ea47bbfb..54d2fa03 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -36,6 +36,7 @@
 import org.apache.fluo.accumulo.iterators.OpenReadLockIterator;
 import org.apache.fluo.accumulo.iterators.PrewriteIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -50,7 +51,6 @@
 import org.apache.fluo.core.util.FluoCondition;
 import org.apache.fluo.core.util.SpanUtil;
 
-import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
 import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
 
 /**
@@ -99,7 +99,7 @@
     public LockInfo(Entry<Key, Value> kve) {
       long rawTs = kve.getKey().getTimestamp();
       this.entry = kve;
-      if ((rawTs & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+      if (ColumnType.from(rawTs) == ColumnType.RLOCK) {
         this.lockTs = ReadLockUtil.decodeTs(rawTs);
         ReadLockValue rlv = new ReadLockValue(kve.getValue().get());
         this.prow = rlv.getPrimaryRow();
@@ -221,11 +221,11 @@ private static void rollback(Environment env, long startTs, PrimaryRowColumn prc
       if (lockInfo.isReadLock) {
         mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(),
             k.getColumnVisibilityParsed(),
-            ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(lockInfo.lockTs, true),
+            ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(lockInfo.lockTs, true)),
             DelReadLockValue.encodeRollback());
       } else {
         mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(),
-            k.getColumnVisibilityParsed(), ColumnConstants.DEL_LOCK_PREFIX | lockInfo.lockTs,
+            k.getColumnVisibilityParsed(), ColumnType.DEL_LOCK.enode(lockInfo.lockTs),
             DelLockValue.encodeRollback(false, true));
       }
     }
@@ -241,7 +241,7 @@ private static boolean rollbackPrimary(Environment env, long startTs, PrimaryRow
     ConditionalFlutation delLockMutation = new ConditionalFlutation(env, prc.prow,
         new FluoCondition(env, prc.pcol).setIterators(iterConf).setValue(lockValue));
 
-    delLockMutation.put(prc.pcol, ColumnConstants.DEL_LOCK_PREFIX | prc.startTs,
+    delLockMutation.put(prc.pcol, ColumnType.DEL_LOCK.enode(prc.startTs),
         DelLockValue.encodeRollback(true, true));
 
     ConditionalWriter cw = null;
@@ -312,7 +312,7 @@ private static boolean isPrimary(PrimaryRowColumn prc, Key k) {
       for (Column col : e1.getValue()) {
         Key start = SpanUtil.toKey(new RowColumn(e1.getKey(), col));
         Key end = new Key(start);
-        end.setTimestamp(ColumnConstants.LOCK_PREFIX | ColumnConstants.TIMESTAMP_MASK);
+        end.setTimestamp(ColumnType.LOCK.first());
         ranges.add(new Range(start, true, end, false));
       }
     }
@@ -329,7 +329,7 @@ private static boolean isPrimary(PrimaryRowColumn prc, Key k) {
 
       List<Entry<Key, Value>> ret = new ArrayList<>();
       for (Entry<Key, Value> entry : bscanner) {
-        if ((entry.getKey().getTimestamp() & PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+        if (ColumnType.from(entry.getKey()) == ColumnType.RLOCK) {
           ret.add(entry);
         }
       }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index f9f1e85a..37dc45b8 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -32,7 +32,7 @@
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
@@ -176,16 +176,20 @@ private void scan(Map<Bytes, Map<Column, Bytes>> ret, List<Entry<Key, Value>> lo
         Bytes row = rowConverter.apply(entry.getKey().getRowData());
         Column col = columnConverter.apply(entry.getKey());
 
-        long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
-
-        if (colType == ColumnConstants.LOCK_PREFIX) {
-          locks.add(entry);
-        } else if (colType == ColumnConstants.DATA_PREFIX) {
-          ret.computeIfAbsent(row, k -> new HashMap<>()).put(col, Bytes.of(entry.getValue().get()));
-        } else if (colType == ColumnConstants.RLOCK_PREFIX) {
-          readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
-        } else {
-          throw new IllegalArgumentException("Unexpected column type " + colType);
+        ColumnType colType = ColumnType.from(entry.getKey());
+        switch (colType) {
+          case LOCK:
+            locks.add(entry);
+            break;
+          case DATA:
+            ret.computeIfAbsent(row, k -> new HashMap<>()).put(col,
+                Bytes.of(entry.getValue().get()));
+            break;
+          case RLOCK:
+            readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
+            break;
+          default:
+            throw new IllegalArgumentException("Unexpected column type " + colType);
         }
       }
     } finally {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index 5db76025..f8f83f72 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -31,7 +31,7 @@
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.SnapshotIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
@@ -172,9 +172,7 @@ public void resolveLock(Entry<Key, Value> lockEntry) {
         while (iterator.hasNext()) {
           Entry<Key, Value> entry = iterator.next();
 
-          long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
-
-          if (colType == ColumnConstants.LOCK_PREFIX) {
+          if (ColumnType.from(entry.getKey()) == ColumnType.LOCK) {
             locks.add(entry);
             locksSeen.accept(lockEntry);
           }
@@ -220,18 +218,19 @@ public void resolveLock(Entry<Key, Value> lockEntry) {
 
         Entry<Key, Value> entry = iterator.next();
 
-        long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
-
-        if (colType == ColumnConstants.LOCK_PREFIX) {
-          resolveLock(entry);
-          continue mloop;
-        } else if (colType == ColumnConstants.DATA_PREFIX) {
-          stats.incrementEntriesReturned(1);
-          return entry;
-        } else if (colType == ColumnConstants.RLOCK_PREFIX) {
-          return entry;
-        } else {
-          throw new IllegalArgumentException("Unexpected column type " + colType);
+        ColumnType colType = ColumnType.from(entry.getKey());
+
+        switch (colType) {
+          case LOCK:
+            resolveLock(entry);
+            continue mloop;
+          case DATA:
+            stats.incrementEntriesReturned(1);
+            return entry;
+          case RLOCK:
+            return entry;
+          default:
+            throw new IllegalArgumentException("Unexpected column type " + colType);
         }
       }
     }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index ae87a24e..3cbbe5cd 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -52,6 +52,7 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.PrewriteIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -87,8 +88,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
-import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX;
 import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
 import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
 
@@ -278,7 +277,7 @@ public Bytes get(Bytes row, Column column) {
         continue;
       }
 
-      if ((kve.getKey().getTimestamp() & PREFIX_MASK) == RLOCK_PREFIX) {
+      if (ColumnType.from(kve.getKey()) == ColumnType.RLOCK) {
         if (readLockCols == null) {
           readLockCols = readLocksSeen.computeIfAbsent(row, k -> new HashSet<>());
         }
@@ -407,14 +406,14 @@ private ConditionalFlutation prewrite(ConditionalFlutation cm, Bytes row, Column
     }
 
     if (isWrite(val) && !isDelete(val)) {
-      cm.put(col, ColumnConstants.DATA_PREFIX | startTs, val.toArray());
+      cm.put(col, ColumnType.DATA.enode(startTs), val.toArray());
     }
 
     if (isReadLock(val)) {
-      cm.put(col, ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, false),
+      cm.put(col, ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, false)),
           ReadLockValue.encode(primaryRow, primaryColumn, getTransactorID()));
     } else {
-      cm.put(col, ColumnConstants.LOCK_PREFIX | startTs, LockValue.encode(primaryRow, primaryColumn,
+      cm.put(col, ColumnType.LOCK.enode(startTs), LockValue.encode(primaryRow, primaryColumn,
           isWrite(val), isDelete(val), isTriggerRow, getTransactorID()));
     }
 
@@ -668,11 +667,10 @@ private boolean checkForAckCollision(ConditionalMutation cm) {
         if (notification.getColumn().equals(col)) {
           // check to see if ACK exist after notification
           Key startKey = SpanUtil.toKey(notification.getRowColumn());
-          startKey.setTimestamp(
-              ColumnConstants.ACK_PREFIX | (Long.MAX_VALUE & ColumnConstants.TIMESTAMP_MASK));
+          startKey.setTimestamp(ColumnType.ACK.first());
 
           Key endKey = SpanUtil.toKey(notification.getRowColumn());
-          endKey.setTimestamp(ColumnConstants.ACK_PREFIX | (notification.getTimestamp() + 1));
+          endKey.setTimestamp(ColumnType.ACK.enode(notification.getTimestamp() + 1));
 
           Range range = new Range(startKey, endKey);
 
@@ -1112,11 +1110,10 @@ public boolean processResults(CommitData cd, Iterator<Result> results) throws Ex
         m = new Flutation(env, row);
         for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
           if (isReadLock(entry.getValue())) {
-            m.put(entry.getKey(),
-                ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+            m.put(entry.getKey(), ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, true)),
                 DelReadLockValue.encodeRollback());
           } else {
-            m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
+            m.put(entry.getKey(), ColumnType.DEL_LOCK.enode(startTs),
                 DelLockValue.encodeRollback(false, true));
           }
         }
@@ -1134,9 +1131,9 @@ public boolean processResults(CommitData cd, Iterator<Result> results) throws Ex
       // mark transaction as complete for garbage collection purposes
       Flutation m = new Flutation(env, cd.prow);
 
-      m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
+      m.put(cd.pcol, ColumnType.DEL_LOCK.enode(startTs),
           DelLockValue.encodeRollback(startTs, true, true));
-      m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
+      m.put(cd.pcol, ColumnType.TX_DONE.enode(startTs), EMPTY);
 
       return Collections.singletonList(m);
     }
@@ -1392,7 +1389,7 @@ public boolean finishCommit(CommitData cd, Stamp commitStamp) {
 
       Flutation m = new Flutation(env, cd.prow);
       // mark transaction as complete for garbage collection purposes
-      m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY);
+      m.put(cd.pcol, ColumnType.TX_DONE.enode(commitTs), EMPTY);
       afterFlushMutations.add(m);
 
       if (weakNotification != null) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
index 849e2b5a..2b6126dc 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -22,6 +22,7 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.RollbackCheckIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.WriteValue;
 import org.apache.fluo.api.data.Bytes;
@@ -51,43 +52,50 @@ public static TxInfo getTransactionInfo(Environment env, Bytes prow, Column pcol
       return txInfo;
     }
 
-    long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+    ColumnType colType = ColumnType.from(entry.getKey());
     long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
-    if (colType == ColumnConstants.LOCK_PREFIX) {
-      if (ts == startTs) {
-        txInfo.status = TxStatus.LOCKED;
-        txInfo.lockValue = entry.getValue().get();
-      } else {
-        txInfo.status = TxStatus.UNKNOWN; // locked by another tx
+    switch (colType) {
+      case LOCK: {
+        if (ts == startTs) {
+          txInfo.status = TxStatus.LOCKED;
+          txInfo.lockValue = entry.getValue().get();
+        } else {
+          txInfo.status = TxStatus.UNKNOWN; // locked by another tx
+        }
+        break;
       }
-    } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
-      DelLockValue dlv = new DelLockValue(entry.getValue().get());
-
-      if (ts != startTs) {
-        // expect this to always be false, must be a bug in the iterator
-        throw new IllegalStateException(prow + " " + pcol + " (" + ts + " != " + startTs + ") ");
+      case DEL_LOCK: {
+        DelLockValue dlv = new DelLockValue(entry.getValue().get());
+
+        if (ts != startTs) {
+          // expect this to always be false, must be a bug in the iterator
+          throw new IllegalStateException(prow + " " + pcol + " (" + ts + " != " + startTs + ") ");
+        }
+
+        if (dlv.isRollback()) {
+          txInfo.status = TxStatus.ROLLED_BACK;
+        } else {
+          txInfo.status = TxStatus.COMMITTED;
+          txInfo.commitTs = dlv.getCommitTimestamp();
+        }
+        break;
       }
+      case WRITE: {
+        long timePtr = WriteValue.getTimestamp(entry.getValue().get());
 
-      if (dlv.isRollback()) {
-        txInfo.status = TxStatus.ROLLED_BACK;
-      } else {
-        txInfo.status = TxStatus.COMMITTED;
-        txInfo.commitTs = dlv.getCommitTimestamp();
-      }
-    } else if (colType == ColumnConstants.WRITE_PREFIX) {
-      long timePtr = WriteValue.getTimestamp(entry.getValue().get());
+        if (timePtr != startTs) {
+          // expect this to always be false, must be a bug in the iterator
+          throw new IllegalStateException(
+              prow + " " + pcol + " (" + timePtr + " != " + startTs + ") ");
+        }
 
-      if (timePtr != startTs) {
-        // expect this to always be false, must be a bug in the iterator
-        throw new IllegalStateException(
-            prow + " " + pcol + " (" + timePtr + " != " + startTs + ") ");
+        txInfo.status = TxStatus.COMMITTED;
+        txInfo.commitTs = ts;
+        break;
       }
-
-      txInfo.status = TxStatus.COMMITTED;
-      txInfo.commitTs = ts;
-    } else {
-      throw new IllegalStateException("unexpected col type returned " + colType);
+      default:
+        throw new IllegalStateException("unexpected col type returned " + colType);
     }
 
     return txInfo;
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index bbf1d83d..9ea51304 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -29,7 +29,7 @@
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -56,19 +56,18 @@ public static void commitColumn(Environment env, boolean isTrigger, boolean isPr
       boolean isWrite, boolean isDelete, boolean isReadlock, long startTs, long commitTs,
       Set<Column> observedColumns, Mutation m) {
     if (isReadlock) {
-      Flutation.put(env, m, col,
-          ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+      Flutation.put(env, m, col, ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, true)),
           DelReadLockValue.encodeCommit(commitTs));
     } else if (isWrite) {
-      Flutation.put(env, m, col, ColumnConstants.WRITE_PREFIX | commitTs,
+      Flutation.put(env, m, col, ColumnType.WRITE.enode(commitTs),
           WriteValue.encode(startTs, isPrimary, isDelete));
     } else {
-      Flutation.put(env, m, col, ColumnConstants.DEL_LOCK_PREFIX | startTs,
+      Flutation.put(env, m, col, ColumnType.DEL_LOCK.enode(startTs),
           DelLockValue.encodeCommit(commitTs, isPrimary));
     }
 
     if (isTrigger) {
-      Flutation.put(env, m, col, ColumnConstants.ACK_PREFIX | startTs, TransactionImpl.EMPTY);
+      Flutation.put(env, m, col, ColumnType.ACK.enode(startTs), TransactionImpl.EMPTY);
     }
   }
 
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
index 546cf35c..3b475bf5 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -26,6 +26,7 @@
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.LongUtil;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
@@ -638,12 +639,12 @@ private boolean wasRolledBackPrimary(long startTs, String rolledBackRow)
     Scanner scanner = aClient.createScanner(getCurTableName(), Authorizations.EMPTY);
 
     for (Entry<Key, Value> entry : scanner) {
-      long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(entry.getKey());
       long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
       String row = entry.getKey().getRowData().toString();
       byte[] val = entry.getValue().get();
 
-      if (row.equals(rolledBackRow) && colType == ColumnConstants.DEL_LOCK_PREFIX && ts == startTs
+      if (row.equals(rolledBackRow) && colType == ColumnType.DEL_LOCK && ts == startTs
           && DelLockValue.isPrimary(val)) {
         sawExpected = true;
       }
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
index b99b16d6..03ff986f 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -27,6 +27,7 @@
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.accumulo.format.FluoFormatter;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.api.client.TransactionBase;
@@ -308,10 +309,10 @@ private void verify(long oldestTs) throws TableNotFoundException {
         numWrites = 0;
       }
 
-      long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(entry.getKey());
       long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
-      if (colType == ColumnConstants.WRITE_PREFIX) {
+      if (colType == ColumnType.WRITE) {
         numWrites++;
         if (numWrites > 1) {
           Assert.assertTrue("Extra write had ts " + ts + " < " + oldestTs, ts >= oldestTs);
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
index 1e959ff5..3fa156c4 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -19,7 +19,7 @@
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
 import org.apache.accumulo.core.data.Key;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.values.WriteValue;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -45,14 +45,14 @@
  *   FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
  *   // could also reuse column objects.
  *   Column column = new Column("fam1", "fam2");
- * 
+ *
  *   fkvg.setRow("row1").setColumn(column).setValue("val2");
- * 
+ *
  *   for (FluoKeyValue fluoKeyValue : fkvg.getKeyValues())
  *     writeToAccumuloFile(fluoKeyValue);
- * 
+ *
  *   fkvg.setRow("row2").setColumn(column).setValue("val3");
- * 
+ *
  *   // Each call to getKeyValues() returns the same objects populated with different data when
  *   // possible. So subsequent calls to getKeyValues() will create less objects. Of course this
  *   // invalidates what was returned by previous calls to getKeyValues().
@@ -187,11 +187,11 @@ public FluoKeyValueGenerator set(RowColumnValue rcv) {
    */
   public FluoKeyValue[] getKeyValues() {
     FluoKeyValue kv = keyVals[0];
-    kv.setKey(new Key(row, fam, qual, vis, ColumnConstants.WRITE_PREFIX | 1));
+    kv.setKey(new Key(row, fam, qual, vis, ColumnType.WRITE.enode(1)));
     kv.getValue().set(WriteValue.encode(0, false, false));
 
     kv = keyVals[1];
-    kv.setKey(new Key(row, fam, qual, vis, ColumnConstants.DATA_PREFIX | 0));
+    kv.setKey(new Key(row, fam, qual, vis, ColumnType.DATA.enode(0)));
     kv.getValue().set(val);
 
     return keyVals;
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
index dd813abc..e09f20e5 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -20,7 +20,7 @@
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.values.WriteValue;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -83,9 +83,8 @@ public FluoMutationGenerator put(Column col, Bytes value) {
   }
 
   public FluoMutationGenerator put(Column col, byte[] value) {
-    Flutation.put(mutation, col, ColumnConstants.DATA_PREFIX | 0, value);
-    Flutation.put(mutation, col, ColumnConstants.WRITE_PREFIX | 1,
-        WriteValue.encode(0, false, false));
+    Flutation.put(mutation, col, ColumnType.DATA.enode(0), value);
+    Flutation.put(mutation, col, ColumnType.WRITE.enode(1), WriteValue.encode(0, false, false));
     return this;
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services