You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/05/12 15:27:19 UTC

[hbase] branch branch-1 updated: HBASE-24351 Backport HBASE-24302 to branch-1 (#1693)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new f57b3d6  HBASE-24351 Backport HBASE-24302 to branch-1 (#1693)
f57b3d6 is described below

commit f57b3d68c75bf707b3095d48d7620448c1be6e8d
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Tue May 12 16:27:05 2020 +0100

    HBASE-24351 Backport HBASE-24302 to branch-1 (#1693)
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../apache/hadoop/hbase/mapreduce/HashTable.java   |  55 +++++++----
 .../apache/hadoop/hbase/mapreduce/SyncTable.java   |  38 +++++++-
 .../hadoop/hbase/mapreduce/TestSyncTable.java      | 104 +++++++++++++--------
 3 files changed, 134 insertions(+), 63 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
index 43c72c4..0657e45 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
@@ -72,6 +72,7 @@ public class HashTable extends Configured implements Tool {
   final static String MANIFEST_FILE_NAME = "manifest";
   final static String HASH_DATA_DIR = "hashes";
   final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
+  final static String IGNORE_TIMESTAMPS = "ignoreTimestamps";
   private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
   
   TableHash tableHash = new TableHash();
@@ -95,6 +96,7 @@ public class HashTable extends Configured implements Tool {
     int versions = -1;
     long startTime = 0;
     long endTime = 0;
+    boolean ignoreTimestamps;
     
     List<ImmutableBytesWritable> partitions;
     
@@ -433,6 +435,7 @@ public class HashTable extends Configured implements Tool {
           getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
     Configuration jobConf = job.getConfiguration();
     jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
+    jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps);
     job.setJarByClass(HashTable.class);
 
     TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
@@ -470,6 +473,7 @@ public class HashTable extends Configured implements Tool {
     private ImmutableBytesWritable batchStartKey;
     private ImmutableBytesWritable batchHash;
     private long batchSize = 0;
+    boolean ignoreTimestamps;
     
     
     public ResultHasher() {
@@ -503,9 +507,11 @@ public class HashTable extends Configured implements Tool {
         digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
         digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
         long ts = cell.getTimestamp();
-        for (int i = 8; i > 0; i--) {
-          digest.update((byte) ts);
-          ts >>>= 8;
+        if(!ignoreTimestamps) {
+          for (int i = 8; i > 0; i--) {
+            digest.update((byte) ts);
+            ts >>>= 8;
+          }
         }
         digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
         
@@ -551,7 +557,8 @@ public class HashTable extends Configured implements Tool {
       targetBatchSize = context.getConfiguration()
           .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
       hasher = new ResultHasher();
-      
+      hasher.ignoreTimestamps = context.getConfiguration().
+        getBoolean(IGNORE_TIMESTAMPS, false);
       TableSplit split = (TableSplit) context.getInputSplit();
       hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
     }
@@ -602,21 +609,24 @@ public class HashTable extends Configured implements Tool {
     System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
     System.err.println();
     System.err.println("Options:");
-    System.err.println(" batchsize     the target amount of bytes to hash in each batch");
-    System.err.println("               rows are added to the batch until this size is reached");
-    System.err.println("               (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
-    System.err.println(" numhashfiles  the number of hash files to create");
-    System.err.println("               if set to fewer than number of regions then");
-    System.err.println("               the job will create this number of reducers");
-    System.err.println("               (defaults to 1/100 of regions -- at least 1)");
-    System.err.println(" startrow      the start row");
-    System.err.println(" stoprow       the stop row");
-    System.err.println(" starttime     beginning of the time range (unixtime in millis)");
-    System.err.println("               without endtime means from starttime to forever");
-    System.err.println(" endtime       end of the time range.  Ignored if no starttime specified.");
-    System.err.println(" scanbatch     scanner batch size to support intra row scans");
-    System.err.println(" versions      number of cell versions to include");
-    System.err.println(" families      comma-separated list of families to include");
+    System.err.println(" batchsize         the target amount of bytes to hash in each batch");
+    System.err.println("                   rows are added to the batch until this size is reached");
+    System.err.println("                   (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
+    System.err.println(" numhashfiles      the number of hash files to create");
+    System.err.println("                   if set to fewer than number of regions then");
+    System.err.println("                   the job will create this number of reducers");
+    System.err.println("                   (defaults to 1/100 of regions -- at least 1)");
+    System.err.println(" startrow          the start row");
+    System.err.println(" stoprow           the stop row");
+    System.err.println(" starttime         beginning of the time range (unixtime in millis)");
+    System.err.println("                   without endtime means from starttime to forever");
+    System.err.println(" endtime           end of the time range.");
+    System.err.println("                   Ignored if no starttime specified.");
+    System.err.println(" scanbatch         scanner batch size to support intra row scans");
+    System.err.println(" versions          number of cell versions to include");
+    System.err.println(" families          comma-separated list of families to include");
+    System.err.println(" ignoreTimestamps  if true, ignores cell timestamps");
+    System.err.println("                   when calculating hashes");
     System.err.println();
     System.err.println("Args:");
     System.err.println(" tablename     Name of the table to hash");
@@ -701,6 +711,13 @@ public class HashTable extends Configured implements Tool {
           continue;
         }
 
+        final String ignoreTimestampsKey = "--ignoreTimestamps=";
+        if (cmd.startsWith(ignoreTimestampsKey)) {
+          tableHash.ignoreTimestamps = Boolean.
+            parseBoolean(cmd.substring(ignoreTimestampsKey.length()));
+          continue;
+        }
+
         printUsage("Invalid argument '" + cmd + "'");
         return false;
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
index 03c5e1a..2fb256e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -66,6 +67,7 @@ public class SyncTable extends Configured implements Tool {
   static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
   static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
   static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
+  static final String IGNORE_TIMESTAMPS = "sync.table.ignore.timestamps";
 
   Path sourceHashDir;
   String sourceTableName;
@@ -76,6 +78,7 @@ public class SyncTable extends Configured implements Tool {
   boolean dryRun;
   boolean doDeletes = true;
   boolean doPuts = true;
+  boolean ignoreTimestamps;
 
   Counters counters;
 
@@ -149,6 +152,7 @@ public class SyncTable extends Configured implements Tool {
     jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
     jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
     jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts);
+    jobConf.setBoolean(IGNORE_TIMESTAMPS, ignoreTimestamps);
 
     TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
         SyncMapper.class, null, null, job);
@@ -185,6 +189,7 @@ public class SyncTable extends Configured implements Tool {
     boolean dryRun;
     boolean doDeletes = true;
     boolean doPuts = true;
+    boolean ignoreTimestamp;
 
     HashTable.TableHash sourceTableHash;
     HashTable.TableHash.Reader sourceHashReader;
@@ -211,6 +216,7 @@ public class SyncTable extends Configured implements Tool {
       dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
       doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true);
       doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true);
+      ignoreTimestamp = conf.getBoolean(IGNORE_TIMESTAMPS, false);
 
       sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
       LOG.info("Read source hash manifest: " + sourceTableHash);
@@ -226,6 +232,7 @@ public class SyncTable extends Configured implements Tool {
       // instead, find the first hash batch at or after the start row
       // and skip any rows that come before.  they will be caught by the previous task
       targetHasher = new HashTable.ResultHasher();
+      targetHasher.ignoreTimestamps = ignoreTimestamp;
     }
 
     private static Connection openConnection(Configuration conf, String zkClusterConfKey,
@@ -473,6 +480,14 @@ public class SyncTable extends Configured implements Tool {
       }
     }
 
+    private Cell checkAndResetTimestamp(Cell sourceCell){
+      if (ignoreTimestamp) {
+        sourceCell = new KeyValue(sourceCell);
+        ((KeyValue) sourceCell).setTimestamp(System.currentTimeMillis());
+      }
+      return sourceCell;
+    }
+
     /**
      * Compare the cells for the given row from the source and target tables.
      * Count and log any differences.
@@ -501,6 +516,7 @@ public class SyncTable extends Configured implements Tool {
             if (put == null) {
               put = new Put(rowKey);
             }
+            sourceCell = checkAndResetTimestamp(sourceCell);
             put.add(sourceCell);
           }
 
@@ -544,6 +560,7 @@ public class SyncTable extends Configured implements Tool {
               if (put == null) {
                 put = new Put(rowKey);
               }
+              sourceCell = checkAndResetTimestamp(sourceCell);
               put.add(sourceCell);
             }
           }
@@ -604,7 +621,7 @@ public class SyncTable extends Configured implements Tool {
      * They are assumed to be of the same row.
      * Nulls are after non-nulls.
      */
-     private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
+    private int compareCellKeysWithinRow(Cell c1, Cell c2) {
       if (c1 == null) {
         return 1; // source missing cell
       }
@@ -621,9 +638,12 @@ public class SyncTable extends Configured implements Tool {
       if (result != 0) {
         return result;
       }
-
-      // note timestamp comparison is inverted - more recent cells first
-      return CellComparator.compareTimestamps(c1, c2);
+      if (this.ignoreTimestamp) {
+        return 0;
+      } else{
+        // note timestamp comparison is inverted - more recent cells first
+        return CellComparator.compareTimestamps(c1, c2);
+      }
     }
 
     @Override
@@ -723,6 +743,10 @@ public class SyncTable extends Configured implements Tool {
     System.err.println("                  (defaults to true)");
     System.err.println(" doPuts           if false, does not perform puts ");
     System.err.println("                  (defaults to true)");
+    System.err.println(" ignoreTimestamps if true, ignores cells timestamps while comparing ");
+    System.err.println("                  cell values. Any missing cell on target then gets");
+    System.err.println("                  added with current time as timestamp ");
+    System.err.println("                  (defaults to false)");
     System.err.println();
     System.err.println("Args:");
     System.err.println(" sourcehashdir    path to HashTable output dir for source table");
@@ -786,6 +810,12 @@ public class SyncTable extends Configured implements Tool {
           continue;
         }
 
+        final String ignoreTimestampsKey = "--ignoreTimestamps=";
+        if (cmd.startsWith(ignoreTimestampsKey)) {
+          ignoreTimestamps = Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length()));
+          continue;
+        }
+
         printUsage("Invalid argument '" + cmd + "'");
         return false;
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
index b43f611..f51d495 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
@@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -91,7 +90,7 @@ public class TestSyncTable {
     writeTestData(sourceTableName, targetTableName);
     hashSourceTable(sourceTableName, testDir);
     Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
-    assertEqualTables(90, sourceTableName, targetTableName);
+    assertEqualTables(90, sourceTableName, targetTableName, false);
     
     assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
     assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
@@ -150,8 +149,31 @@ public class TestSyncTable {
     TEST_UTIL.deleteTable(targetTableName);
   }
 
+  @Test
+  public void testSyncTableIgnoreTimestampsTrue() throws Exception {
+    final TableName sourceTableName = TableName.valueOf("testSyncTableIgnoreTimestampsTrue_source");
+    final TableName targetTableName = TableName.valueOf("testSyncTableIgnoreTimestampsTrue_target");
+    Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableIgnoreTimestampsTrue");
+    long current = System.currentTimeMillis();
+    writeTestData(sourceTableName, targetTableName, current - 1000, current);
+    hashSourceTable(sourceTableName, testDir, "--ignoreTimestamps=true");
+    Counters syncCounters = syncTables(sourceTableName, targetTableName,
+      testDir, "--ignoreTimestamps=true");
+    assertEqualTables(90, sourceTableName, targetTableName, true);
+
+    assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
+    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
+    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
+    assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
+    assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
+    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
+
+    TEST_UTIL.deleteTable(sourceTableName);
+    TEST_UTIL.deleteTable(targetTableName);
+  }
+
   private void assertEqualTables(int expectedRows, TableName sourceTableName,
-    TableName targetTableName) throws Exception {
+      TableName targetTableName, boolean ignoreTimestamps) throws Exception {
     Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
     Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
     
@@ -198,7 +220,7 @@ public class TestSyncTable {
           if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
             Assert.fail("Qualifiers don't match");
           }
-          if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
+          if (!ignoreTimestamps && !CellUtil.matchingTimestamp(sourceCell, targetCell)) {
             Assert.fail("Timestamps don't match");
           }
           if (!CellUtil.matchingValue(sourceCell, targetCell)) {
@@ -428,18 +450,19 @@ public class TestSyncTable {
     return syncTable.counters;
   }
 
-  private void hashSourceTable(TableName sourceTableName, Path testDir)
-      throws Exception, IOException {
+  private void hashSourceTable(TableName sourceTableName, Path testDir, String... options)
+      throws Exception {
     int numHashFiles = 3;
     long batchSize = 100;  // should be 2 batches per region
     int scanBatch = 1;
     HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
-    int code = hashTable.run(new String[] { 
-        "--batchsize=" + batchSize,
-        "--numhashfiles=" + numHashFiles,
-        "--scanbatch=" + scanBatch,
-        sourceTableName.getNameAsString(),
-        testDir.toString()});
+    String[] args = Arrays.copyOf(options, options.length + 5);
+    args[options.length] = "--batchsize=" + batchSize;
+    args[options.length + 1] = "--numhashfiles=" + numHashFiles;
+    args[options.length + 2] = "--scanbatch=" + scanBatch;
+    args[options.length + 3] = sourceTableName.getNameAsString();
+    args[options.length + 4] = testDir.toString();
+    int code = hashTable.run(args);
     assertEquals("hash table job failed", 0, code);
     
     FileSystem fs = TEST_UTIL.getTestFileSystem();
@@ -453,8 +476,8 @@ public class TestSyncTable {
     LOG.info("Hash table completed");
   }
 
-  private void writeTestData(TableName sourceTableName, TableName targetTableName)
-      throws Exception {
+  private void writeTestData(TableName sourceTableName, TableName targetTableName,
+      long... timestamps) throws Exception {
     final byte[] family = Bytes.toBytes("family");
     final byte[] column1 = Bytes.toBytes("c1");
     final byte[] column2 = Bytes.toBytes("c2");
@@ -465,26 +488,27 @@ public class TestSyncTable {
     int numRows = 100;
     int sourceRegions = 10;
     int targetRegions = 6;
-
+    if (ArrayUtils.isEmpty(timestamps)) {
+      long current = System.currentTimeMillis();
+      timestamps = new long[]{current,current};
+    }
     Table sourceTable = TEST_UTIL.createTable(sourceTableName,
       family, generateSplits(numRows, sourceRegions));
 
     Table targetTable = TEST_UTIL.createTable(targetTableName,
       family, generateSplits(numRows, targetRegions));
 
-    long timestamp = 1430764183454L;
-
     int rowIndex = 0;
     // a bunch of identical rows
     for (; rowIndex < 40; rowIndex++) {
       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
-      sourcePut.addColumn(family, column1, timestamp, value1);
-      sourcePut.addColumn(family, column2, timestamp, value2);
+      sourcePut.addColumn(family, column1, timestamps[0], value1);
+      sourcePut.addColumn(family, column2, timestamps[0], value2);
       sourceTable.put(sourcePut);
      
       Put targetPut = new Put(Bytes.toBytes(rowIndex));
-      targetPut.addColumn(family, column1, timestamp, value1);
-      targetPut.addColumn(family, column2, timestamp, value2);
+      targetPut.addColumn(family, column1, timestamps[1], value1);
+      targetPut.addColumn(family, column2, timestamps[1], value2);
       targetTable.put(targetPut);
     }
     // some rows only in the source table
@@ -493,8 +517,8 @@ public class TestSyncTable {
     // TARGETMISSINGCELLS: 20
     for (; rowIndex < 50; rowIndex++) {
       Put put = new Put(Bytes.toBytes(rowIndex));
-      put.addColumn(family, column1, timestamp, value1);
-      put.addColumn(family, column2, timestamp, value2);
+      put.addColumn(family, column1, timestamps[0], value1);
+      put.addColumn(family, column2, timestamps[0], value2);
       sourceTable.put(put);
     }
     // some rows only in the target table
@@ -503,8 +527,8 @@ public class TestSyncTable {
     // SOURCEMISSINGCELLS: 20
     for (; rowIndex < 60; rowIndex++) {
       Put put = new Put(Bytes.toBytes(rowIndex));
-      put.addColumn(family, column1, timestamp, value1);
-      put.addColumn(family, column2, timestamp, value2);
+      put.addColumn(family, column1, timestamps[1], value1);
+      put.addColumn(family, column2, timestamps[1], value2);
       targetTable.put(put);
     }
     // some rows with 1 missing cell in target table
@@ -512,12 +536,12 @@ public class TestSyncTable {
     // TARGETMISSINGCELLS: 10
     for (; rowIndex < 70; rowIndex++) {
       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
-      sourcePut.addColumn(family, column1, timestamp, value1);
-      sourcePut.addColumn(family, column2, timestamp, value2);
+      sourcePut.addColumn(family, column1, timestamps[0], value1);
+      sourcePut.addColumn(family, column2, timestamps[0], value2);
       sourceTable.put(sourcePut);
 
       Put targetPut = new Put(Bytes.toBytes(rowIndex));
-      targetPut.addColumn(family, column1, timestamp, value1);
+      targetPut.addColumn(family, column1, timestamps[1], value1);
       targetTable.put(targetPut);
     }
     // some rows with 1 missing cell in source table
@@ -525,12 +549,12 @@ public class TestSyncTable {
     // SOURCEMISSINGCELLS: 10
     for (; rowIndex < 80; rowIndex++) {
       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
-      sourcePut.addColumn(family, column1, timestamp, value1);
+      sourcePut.addColumn(family, column1, timestamps[0], value1);
       sourceTable.put(sourcePut);
 
       Put targetPut = new Put(Bytes.toBytes(rowIndex));
-      targetPut.addColumn(family, column1, timestamp, value1);
-      targetPut.addColumn(family, column2, timestamp, value2);
+      targetPut.addColumn(family, column1, timestamps[1], value1);
+      targetPut.addColumn(family, column2, timestamps[1], value2);
       targetTable.put(targetPut);
     }
     // some rows differing only in timestamp
@@ -539,13 +563,13 @@ public class TestSyncTable {
     // TARGETMISSINGCELLS: 20
     for (; rowIndex < 90; rowIndex++) {
       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
-      sourcePut.addColumn(family, column1, timestamp, column1);
-      sourcePut.addColumn(family, column2, timestamp, value2);
+      sourcePut.addColumn(family, column1, timestamps[0], column1);
+      sourcePut.addColumn(family, column2, timestamps[0], value2);
       sourceTable.put(sourcePut);
 
       Put targetPut = new Put(Bytes.toBytes(rowIndex));
-      targetPut.addColumn(family, column1, timestamp+1, column1);
-      targetPut.addColumn(family, column2, timestamp-1, value2);
+      targetPut.addColumn(family, column1, timestamps[1]+1, column1);
+      targetPut.addColumn(family, column2, timestamps[1]-1, value2);
       targetTable.put(targetPut);
     }
     // some rows with different values
@@ -553,13 +577,13 @@ public class TestSyncTable {
     // DIFFERENTCELLVALUES: 20
     for (; rowIndex < numRows; rowIndex++) {
       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
-      sourcePut.addColumn(family, column1, timestamp, value1);
-      sourcePut.addColumn(family, column2, timestamp, value2);
+      sourcePut.addColumn(family, column1, timestamps[0], value1);
+      sourcePut.addColumn(family, column2, timestamps[0], value2);
       sourceTable.put(sourcePut);
       
       Put targetPut = new Put(Bytes.toBytes(rowIndex));
-      targetPut.addColumn(family, column1, timestamp, value3);
-      targetPut.addColumn(family, column2, timestamp, value3);
+      targetPut.addColumn(family, column1, timestamps[1], value3);
+      targetPut.addColumn(family, column2, timestamps[1], value3);
       targetTable.put(targetPut);
     }