You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/09/02 17:33:08 UTC

[3/3] phoenix git commit: Revert "PHOENIX-4141 Fix flapping TableSnapshotReadsMapReduceIT"

Revert "PHOENIX-4141 Fix flapping TableSnapshotReadsMapReduceIT"

This reverts commit 378b56c4ad71f9e10887adec92618300285f6d2d.


Branch: refs/heads/master
Commit: 21630d85f06d12db4b75776eb708814a0c360ec5
Parents: 38915fe
Author: Samarth Jain <>
Authored: Sat Sep 2 10:32:44 2017 -0700
Committer: Samarth Jain <>
Committed: Sat Sep 2 10:32:44 2017 -0700

 .../end2end/  | 402 +++++++++----------
 1 file changed, 182 insertions(+), 220 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/
index 591f028..4cc2a20 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/
@@ -18,25 +18,11 @@
 package org.apache.phoenix.end2end;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -46,215 +32,191 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
-import org.apache.phoenix.util.EnvironmentEdge;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
-    private final static String SNAPSHOT_NAME = "FOO";
-    private static final String FIELD1 = "FIELD1";
-    private static final String FIELD2 = "FIELD2";
-    private static final String FIELD3 = "FIELD3";
-    private String CREATE_TABLE =
-            "CREATE TABLE IF NOT EXISTS %s ( "
-    private String UPSERT = "UPSERT into %s values (?, ?, ?)";
-    private static List<List<Object>> result;
-    private String tableName;
-    private MyClock clock;
-    @Before
-    public void injectMyClock() {
-        clock = new MyClock(1000);
-        // Use our own clock to prevent race between partial rebuilder and compaction
-        EnvironmentEdgeManager.injectEdge(clock);
-    }
+import org.junit.*;
-    @After
-    public void removeMyClock() {
-        EnvironmentEdgeManager.injectEdge(null);
-    }
-    @Test
-    public void testMapReduceSnapshots() throws Exception {
-        // create table
-        Connection conn = DriverManager.getConnection(getUrl());
-        tableName = generateUniqueName();
-        conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
-        conn.commit();
-        // configure Phoenix M/R job to read snapshot
-        final Configuration conf = getUtility().getConfiguration();
-        Job job = Job.getInstance(conf);
-        Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
-        PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
-            tmpDir, null, FIELD1, FIELD2, FIELD3);
-        // configure and test job
-        configureJob(job, tableName, null, null);
-    }
-    @Test
-    public void testMapReduceSnapshotsWithCondition() throws Exception {
-        // create table
-        Connection conn = DriverManager.getConnection(getUrl());
-        tableName = generateUniqueName();
-        conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
-        conn.commit();
-        // configure Phoenix M/R job to read snapshot
-        final Configuration conf = getUtility().getConfiguration();
-        Job job = Job.getInstance(conf);
-        Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
-        PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
-            tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
-        // configure and test job
-        configureJob(job, tableName, null, "FIELD3 > 0001");
-    }
-    @Test
-    public void testMapReduceSnapshotWithLimit() throws Exception {
-        // create table
-        Connection conn = DriverManager.getConnection(getUrl());
-        tableName = generateUniqueName();
-        conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
-        conn.commit();
-        // configure Phoenix M/R job to read snapshot
-        final Configuration conf = getUtility().getConfiguration();
-        Job job = Job.getInstance(conf);
-        Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
-        // Running limit with order by on non pk column
-        String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
-        PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
-            tmpDir, inputQuery);
-        // configure and test job
-        configureJob(job, tableName, inputQuery, null);
-    }
-    private void configureJob(Job job, String tableName, String inputQuery, String condition)
-            throws Exception {
-        try {
-            upsertAndSnapshot(tableName);
-            result = new ArrayList<>();
-            job.setMapperClass(TableSnapshotMapper.class);
-            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-            job.setMapOutputValueClass(NullWritable.class);
-            job.setOutputFormatClass(NullOutputFormat.class);
-            Assert.assertTrue(job.waitForCompletion(true));
-            // verify the result, should match the values at the corresponding timestamp
-            Properties props = new Properties();
-            props.setProperty("CurrentSCN", Long.toString(clock.time));
-            StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName);
-            if (condition != null) {
-                selectQuery.append(" WHERE " + condition);
-            }
-            if (inputQuery == null) inputQuery = selectQuery.toString();
-            ResultSet rs =
-                    DriverManager.getConnection(getUrl(), props).createStatement()
-                            .executeQuery(inputQuery);
-            for (List<Object> r : result) {
-                assertTrue("No data stored in the table!",;
-                int i = 0;
-                String field1 = rs.getString(i + 1);
-                assertEquals("Got the incorrect value for field1", r.get(i++), field1);
-                String field2 = rs.getString(i + 1);
-                assertEquals("Got the incorrect value for field2", r.get(i++), field2);
-                int field3 = rs.getInt(i + 1);
-                assertEquals("Got the incorrect value for field3", r.get(i++), field3);
-            }
-            assertFalse(
-                "Should only have stored " + result.size() + "rows in the table for the timestamp!",
-      ;
-        } finally {
-            deleteSnapshotAndTable(tableName);
-        }
-    }
-    private static class MyClock extends EnvironmentEdge {
-        public volatile long time;
-        public MyClock(long time) {
-            this.time = time;
-        }
-        @Override
-        public long currentTime() {
-            return time;
-        }
-    }
-    private void upsertData(String tableName) throws SQLException {
-        Connection conn = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
-        upsertData(stmt, "CCCC", "SSDD", 0001);
-        upsertData(stmt, "CCCC", "HDHG", 0005);
-        upsertData(stmt, "BBBB", "JSHJ", 0002);
-        upsertData(stmt, "AAAA", "JHHD", 0003);
-        conn.commit();
-    }
-    private void upsertData(PreparedStatement stmt, String field1, String field2, int field3)
-            throws SQLException {
-        stmt.setString(1, field1);
-        stmt.setString(2, field2);
-        stmt.setInt(3, field3);
-        stmt.execute();
-    }
-    public void upsertAndSnapshot(String tableName) throws Exception {
-        clock.time += 1000;
-        upsertData(tableName);
-        Connection conn = DriverManager.getConnection(getUrl());
-        HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-        admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName));
-        // call flush to create new files in the region
-        admin.flush(tableName);
-        List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
-        Assert.assertEquals(tableName, snapshots.get(0).getTable());
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
-        clock.time += 1000;
-        // upsert data after snapshot
-        PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
-        upsertData(stmt, "DDDD", "SNFB", 0004);
-        conn.commit();
-    }
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
-    public void deleteSnapshotAndTable(String tableName) throws Exception {
-        Connection conn = DriverManager.getConnection(getUrl());
-        HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-        admin.deleteSnapshot(SNAPSHOT_NAME);
+public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
+  private final static String SNAPSHOT_NAME = "FOO";
+  private static final String FIELD1 = "FIELD1";
+  private static final String FIELD2 = "FIELD2";
+  private static final String FIELD3 = "FIELD3";
+  private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
+  private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+  private static List<List<Object>> result;
+  private long timestamp;
+  private String tableName;
+  @Test
+  public void testMapReduceSnapshots() throws Exception {
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+    // configure Phoenix M/R job to read snapshot
+    final Configuration conf = getUtility().getConfiguration();
+    Job job = Job.getInstance(conf);
+    Path tmpDir = getUtility().getRandomDir();
+    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, null, FIELD1, FIELD2, FIELD3);
+    // configure and test job
+    configureJob(job, tableName, null, null);
+  }
+  @Test
+  public void testMapReduceSnapshotsWithCondition() throws Exception {
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+    // configure Phoenix M/R job to read snapshot
+    final Configuration conf = getUtility().getConfiguration();
+    Job job = Job.getInstance(conf);
+    Path tmpDir = getUtility().getRandomDir();
+    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
+    // configure and test job
+    configureJob(job, tableName, null, "FIELD3 > 0001");
+  }
+  @Test
+  public void testMapReduceSnapshotWithLimit() throws Exception {
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+    // configure Phoenix M/R job to read snapshot
+    final Configuration conf = getUtility().getConfiguration();
+    Job job = Job.getInstance(conf);
+    Path tmpDir = getUtility().getRandomDir();
+    // Running limit with order by on non pk column
+    String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
+    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,inputQuery);
+    // configure and test job
+    configureJob(job, tableName, inputQuery, null);
+  }
+  private void configureJob(Job job, String tableName, String inputQuery, String condition) throws Exception {
+    try {
+      upsertAndSnapshot(tableName);
+      result = new ArrayList<>();
+      job.setMapperClass(TableSnapshotMapper.class);
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+      job.setMapOutputValueClass(NullWritable.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+      Assert.assertTrue(job.waitForCompletion(true));
+      // verify the result, should match the values at the corresponding timestamp
+      Properties props = new Properties();
+      props.setProperty("CurrentSCN", Long.toString(timestamp));
+      StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName);
+      if (condition != null) {
+        selectQuery.append(" WHERE " + condition);
+      }
+      if (inputQuery == null)
+        inputQuery = selectQuery.toString();
+      ResultSet rs = DriverManager.getConnection(getUrl(), props).createStatement().executeQuery(inputQuery);
+      for (List<Object> r : result) {
+        assertTrue("No data stored in the table!",;
+        int i = 0;
+        String field1 = rs.getString(i + 1);
+        assertEquals("Got the incorrect value for field1", r.get(i++), field1);
+        String field2 = rs.getString(i + 1);
+        assertEquals("Got the incorrect value for field2", r.get(i++), field2);
+        int field3 = rs.getInt(i + 1);
+        assertEquals("Got the incorrect value for field3", r.get(i++), field3);
+      }
+      assertFalse("Should only have stored" + result.size() + "rows in the table for the timestamp!",;
+    } finally {
+      deleteSnapshotAndTable(tableName);
-    public static class TableSnapshotMapper extends
-            Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {
-        @Override
-        protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
-                throws IOException, InterruptedException {
-            final List<Object> values = record.getValues();
-            result.add(values);
-            // write dummy data
-            context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
-                NullWritable.get());
-        }
+  }
+  private void upsertData(String tableName) throws SQLException {
+    Connection conn = DriverManager.getConnection(getUrl());
+    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
+    upsertData(stmt, "CCCC", "SSDD", 0001);
+    upsertData(stmt, "CCCC", "HDHG", 0005);
+    upsertData(stmt, "BBBB", "JSHJ", 0002);
+    upsertData(stmt, "AAAA", "JHHD", 0003);
+    conn.commit();
+    timestamp = System.currentTimeMillis();
+  }
+  private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException {
+    stmt.setString(1, field1);
+    stmt.setString(2, field2);
+    stmt.setInt(3, field3);
+    stmt.execute();
+  }
+  public void upsertAndSnapshot(String tableName) throws Exception {
+    upsertData(tableName);
+    Connection conn = DriverManager.getConnection(getUrl());
+    HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+    admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName));
+    // call flush to create new files in the region
+    admin.flush(tableName);
+    List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
+    Assert.assertEquals(tableName, snapshots.get(0).getTable());
+    // upsert data after snapshot
+    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
+    upsertData(stmt, "DDDD", "SNFB", 0004);
+    conn.commit();
+  }
+  public void deleteSnapshotAndTable(String tableName) throws Exception {
+    Connection conn = DriverManager.getConnection(getUrl());
+    HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+    admin.deleteSnapshot(SNAPSHOT_NAME);
+    conn.createStatement().execute("DROP TABLE " + tableName);
+    conn.close();
+  }
+  public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {
+    @Override
+    protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
+        throws IOException, InterruptedException {
+      final List<Object> values = record.getValues();
+      result.add(values);
+      // write dummy data
+      context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
+          NullWritable.get());
+  }