You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/09/02 17:33:06 UTC
[1/3] phoenix git commit: Revert "PHOENIX-4141 Fix flapping
TableSnapshotReadsMapReduceIT"
Repository: phoenix
Updated Branches:
refs/heads/master 01dbd123b -> 21630d85f
Revert "PHOENIX-4141 Fix flapping TableSnapshotReadsMapReduceIT"
This reverts commit 9fb318bd8f03911bc7cb333a0ec974a685156fb7.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0ace91f7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0ace91f7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0ace91f7
Branch: refs/heads/master
Commit: 0ace91f715e5136404774f98d1441ba639618248
Parents: 01dbd12
Author: Samarth Jain <sa...@apache.org>
Authored: Sat Sep 2 10:27:58 2017 -0700
Committer: Samarth Jain <sa...@apache.org>
Committed: Sat Sep 2 10:27:58 2017 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0ace91f7/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index 85fb0cd..39d97a1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -91,7 +91,7 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
// configure Phoenix M/R job to read snapshot
final Configuration conf = getUtility().getConfiguration();
Job job = Job.getInstance(conf);
- Path tmpDir = new Path("/tmp");
+ Path tmpDir = getUtility().getRandomDir();
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
tmpDir, null, FIELD1, FIELD2, FIELD3);
@@ -111,7 +111,7 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
// configure Phoenix M/R job to read snapshot
final Configuration conf = getUtility().getConfiguration();
Job job = Job.getInstance(conf);
- Path tmpDir = new Path("/tmp");
+ Path tmpDir = getUtility().getRandomDir();
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
@@ -131,7 +131,7 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
// configure Phoenix M/R job to read snapshot
final Configuration conf = getUtility().getConfiguration();
Job job = Job.getInstance(conf);
- Path tmpDir = new Path("/tmp");
+ Path tmpDir = getUtility().getRandomDir();
// Running limit with order by on non pk column
String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
[3/3] phoenix git commit: Revert "PHOENIX-4141 Fix flapping
TableSnapshotReadsMapReduceIT"
Posted by sa...@apache.org.
Revert "PHOENIX-4141 Fix flapping TableSnapshotReadsMapReduceIT"
This reverts commit 378b56c4ad71f9e10887adec92618300285f6d2d.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/21630d85
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/21630d85
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/21630d85
Branch: refs/heads/master
Commit: 21630d85f06d12db4b75776eb708814a0c360ec5
Parents: 38915fe
Author: Samarth Jain <sa...@apache.org>
Authored: Sat Sep 2 10:32:44 2017 -0700
Committer: Samarth Jain <sa...@apache.org>
Committed: Sat Sep 2 10:32:44 2017 -0700
----------------------------------------------------------------------
.../end2end/TableSnapshotReadsMapReduceIT.java | 402 +++++++++----------
1 file changed, 182 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/21630d85/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index 591f028..4cc2a20 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -18,25 +18,11 @@
package org.apache.phoenix.end2end;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.io.NullWritable;
@@ -46,215 +32,191 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
-import org.apache.phoenix.util.EnvironmentEdge;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
- private final static String SNAPSHOT_NAME = "FOO";
- private static final String FIELD1 = "FIELD1";
- private static final String FIELD2 = "FIELD2";
- private static final String FIELD3 = "FIELD3";
- private String CREATE_TABLE =
- "CREATE TABLE IF NOT EXISTS %s ( "
- + " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
- private String UPSERT = "UPSERT into %s values (?, ?, ?)";
-
- private static List<List<Object>> result;
- private String tableName;
- private MyClock clock;
-
- @Before
- public void injectMyClock() {
- clock = new MyClock(1000);
- // Use our own clock to prevent race between partial rebuilder and compaction
- EnvironmentEdgeManager.injectEdge(clock);
- }
+import org.junit.*;
- @After
- public void removeMyClock() {
- EnvironmentEdgeManager.injectEdge(null);
- }
-
- @Test
- public void testMapReduceSnapshots() throws Exception {
- // create table
- Connection conn = DriverManager.getConnection(getUrl());
- tableName = generateUniqueName();
- conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
- conn.commit();
-
- // configure Phoenix M/R job to read snapshot
- final Configuration conf = getUtility().getConfiguration();
- Job job = Job.getInstance(conf);
- Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
-
- PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
- tmpDir, null, FIELD1, FIELD2, FIELD3);
-
- // configure and test job
- configureJob(job, tableName, null, null);
- }
-
- @Test
- public void testMapReduceSnapshotsWithCondition() throws Exception {
- // create table
- Connection conn = DriverManager.getConnection(getUrl());
- tableName = generateUniqueName();
- conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
- conn.commit();
-
- // configure Phoenix M/R job to read snapshot
- final Configuration conf = getUtility().getConfiguration();
- Job job = Job.getInstance(conf);
- Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
- PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
- tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
-
- // configure and test job
- configureJob(job, tableName, null, "FIELD3 > 0001");
-
- }
-
- @Test
- public void testMapReduceSnapshotWithLimit() throws Exception {
- // create table
- Connection conn = DriverManager.getConnection(getUrl());
- tableName = generateUniqueName();
- conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
- conn.commit();
-
- // configure Phoenix M/R job to read snapshot
- final Configuration conf = getUtility().getConfiguration();
- Job job = Job.getInstance(conf);
- Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
- // Running limit with order by on non pk column
- String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
- PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
- tmpDir, inputQuery);
-
- // configure and test job
- configureJob(job, tableName, inputQuery, null);
- }
-
- private void configureJob(Job job, String tableName, String inputQuery, String condition)
- throws Exception {
- try {
- upsertAndSnapshot(tableName);
- result = new ArrayList<>();
-
- job.setMapperClass(TableSnapshotMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(NullWritable.class);
- job.setOutputFormatClass(NullOutputFormat.class);
-
- Assert.assertTrue(job.waitForCompletion(true));
-
- // verify the result, should match the values at the corresponding timestamp
- Properties props = new Properties();
- props.setProperty("CurrentSCN", Long.toString(clock.time));
- StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName);
- if (condition != null) {
- selectQuery.append(" WHERE " + condition);
- }
- if (inputQuery == null) inputQuery = selectQuery.toString();
-
- ResultSet rs =
- DriverManager.getConnection(getUrl(), props).createStatement()
- .executeQuery(inputQuery);
-
- for (List<Object> r : result) {
- assertTrue("No data stored in the table!", rs.next());
- int i = 0;
- String field1 = rs.getString(i + 1);
- assertEquals("Got the incorrect value for field1", r.get(i++), field1);
- String field2 = rs.getString(i + 1);
- assertEquals("Got the incorrect value for field2", r.get(i++), field2);
- int field3 = rs.getInt(i + 1);
- assertEquals("Got the incorrect value for field3", r.get(i++), field3);
- }
-
- assertFalse(
- "Should only have stored " + result.size() + "rows in the table for the timestamp!",
- rs.next());
- } finally {
- deleteSnapshotAndTable(tableName);
- }
- }
-
- private static class MyClock extends EnvironmentEdge {
- public volatile long time;
-
- public MyClock(long time) {
- this.time = time;
- }
-
- @Override
- public long currentTime() {
- return time;
- }
- }
-
- private void upsertData(String tableName) throws SQLException {
- Connection conn = DriverManager.getConnection(getUrl());
- PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
- upsertData(stmt, "CCCC", "SSDD", 0001);
- upsertData(stmt, "CCCC", "HDHG", 0005);
- upsertData(stmt, "BBBB", "JSHJ", 0002);
- upsertData(stmt, "AAAA", "JHHD", 0003);
- conn.commit();
- }
-
- private void upsertData(PreparedStatement stmt, String field1, String field2, int field3)
- throws SQLException {
- stmt.setString(1, field1);
- stmt.setString(2, field2);
- stmt.setInt(3, field3);
- stmt.execute();
- }
-
- public void upsertAndSnapshot(String tableName) throws Exception {
- clock.time += 1000;
- upsertData(tableName);
-
- Connection conn = DriverManager.getConnection(getUrl());
- HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
- admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName));
- // call flush to create new files in the region
- admin.flush(tableName);
-
- List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
- Assert.assertEquals(tableName, snapshots.get(0).getTable());
+import java.io.IOException;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
- clock.time += 1000;
- // upsert data after snapshot
- PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
- upsertData(stmt, "DDDD", "SNFB", 0004);
- conn.commit();
- }
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
- public void deleteSnapshotAndTable(String tableName) throws Exception {
- Connection conn = DriverManager.getConnection(getUrl());
- HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
- admin.deleteSnapshot(SNAPSHOT_NAME);
+public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
+ private final static String SNAPSHOT_NAME = "FOO";
+ private static final String FIELD1 = "FIELD1";
+ private static final String FIELD2 = "FIELD2";
+ private static final String FIELD3 = "FIELD3";
+ private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
+ " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
+ private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+
+ private static List<List<Object>> result;
+ private long timestamp;
+ private String tableName;
+
+
+ @Test
+ public void testMapReduceSnapshots() throws Exception {
+ // create table
+ Connection conn = DriverManager.getConnection(getUrl());
+ tableName = generateUniqueName();
+ conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+ conn.commit();
+
+ // configure Phoenix M/R job to read snapshot
+ final Configuration conf = getUtility().getConfiguration();
+ Job job = Job.getInstance(conf);
+ Path tmpDir = getUtility().getRandomDir();
+
+ PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, null, FIELD1, FIELD2, FIELD3);
+
+ // configure and test job
+ configureJob(job, tableName, null, null);
+ }
+
+ @Test
+ public void testMapReduceSnapshotsWithCondition() throws Exception {
+ // create table
+ Connection conn = DriverManager.getConnection(getUrl());
+ tableName = generateUniqueName();
+ conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+ conn.commit();
+
+ // configure Phoenix M/R job to read snapshot
+ final Configuration conf = getUtility().getConfiguration();
+ Job job = Job.getInstance(conf);
+ Path tmpDir = getUtility().getRandomDir();
+ PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
+
+ // configure and test job
+ configureJob(job, tableName, null, "FIELD3 > 0001");
+
+ }
+
+ @Test
+ public void testMapReduceSnapshotWithLimit() throws Exception {
+ // create table
+ Connection conn = DriverManager.getConnection(getUrl());
+ tableName = generateUniqueName();
+ conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+ conn.commit();
+
+ // configure Phoenix M/R job to read snapshot
+ final Configuration conf = getUtility().getConfiguration();
+ Job job = Job.getInstance(conf);
+ Path tmpDir = getUtility().getRandomDir();
+ // Running limit with order by on non pk column
+ String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
+ PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,inputQuery);
+
+ // configure and test job
+ configureJob(job, tableName, inputQuery, null);
+ }
+
+ private void configureJob(Job job, String tableName, String inputQuery, String condition) throws Exception {
+ try {
+ upsertAndSnapshot(tableName);
+ result = new ArrayList<>();
+
+ job.setMapperClass(TableSnapshotMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ // verify the result, should match the values at the corresponding timestamp
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN", Long.toString(timestamp));
+
+ StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName);
+ if (condition != null) {
+ selectQuery.append(" WHERE " + condition);
+ }
+ if (inputQuery == null)
+ inputQuery = selectQuery.toString();
+
+ ResultSet rs = DriverManager.getConnection(getUrl(), props).createStatement().executeQuery(inputQuery);
+
+ for (List<Object> r : result) {
+ assertTrue("No data stored in the table!", rs.next());
+ int i = 0;
+ String field1 = rs.getString(i + 1);
+ assertEquals("Got the incorrect value for field1", r.get(i++), field1);
+ String field2 = rs.getString(i + 1);
+ assertEquals("Got the incorrect value for field2", r.get(i++), field2);
+ int field3 = rs.getInt(i + 1);
+ assertEquals("Got the incorrect value for field3", r.get(i++), field3);
+ }
+
+ assertFalse("Should only have stored" + result.size() + "rows in the table for the timestamp!", rs.next());
+ } finally {
+ deleteSnapshotAndTable(tableName);
}
-
- public static class TableSnapshotMapper extends
- Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {
-
- @Override
- protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
- throws IOException, InterruptedException {
- final List<Object> values = record.getValues();
- result.add(values);
-
- // write dummy data
- context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
- NullWritable.get());
- }
+ }
+
+ private void upsertData(String tableName) throws SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
+ upsertData(stmt, "CCCC", "SSDD", 0001);
+ upsertData(stmt, "CCCC", "HDHG", 0005);
+ upsertData(stmt, "BBBB", "JSHJ", 0002);
+ upsertData(stmt, "AAAA", "JHHD", 0003);
+ conn.commit();
+ timestamp = System.currentTimeMillis();
+ }
+
+ private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException {
+ stmt.setString(1, field1);
+ stmt.setString(2, field2);
+ stmt.setInt(3, field3);
+ stmt.execute();
+ }
+
+ public void upsertAndSnapshot(String tableName) throws Exception {
+ upsertData(tableName);
+
+ Connection conn = DriverManager.getConnection(getUrl());
+ HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName));
+ // call flush to create new files in the region
+ admin.flush(tableName);
+
+ List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
+ Assert.assertEquals(tableName, snapshots.get(0).getTable());
+
+ // upsert data after snapshot
+ PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
+ upsertData(stmt, "DDDD", "SNFB", 0004);
+ conn.commit();
+ }
+
+ public void deleteSnapshotAndTable(String tableName) throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ admin.deleteSnapshot(SNAPSHOT_NAME);
+
+ conn.createStatement().execute("DROP TABLE " + tableName);
+ conn.close();
+
+ }
+
+ public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {
+
+ @Override
+ protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
+ throws IOException, InterruptedException {
+ final List<Object> values = record.getValues();
+ result.add(values);
+
+ // write dummy data
+ context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
+ NullWritable.get());
}
+ }
}
[2/3] phoenix git commit: Revert "PHOENIX-4141 Addendum to fix test
failure"
Posted by sa...@apache.org.
Revert "PHOENIX-4141 Addendum to fix test failure"
This reverts commit 7d8b8430212fae117ac09faf6b7c22bf673e9073.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/38915fee
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/38915fee
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/38915fee
Branch: refs/heads/master
Commit: 38915feef5f590a882cdc45c7e880943ab099603
Parents: 0ace91f
Author: Samarth Jain <sa...@apache.org>
Authored: Sat Sep 2 10:32:24 2017 -0700
Committer: Samarth Jain <sa...@apache.org>
Committed: Sat Sep 2 10:32:24 2017 -0700
----------------------------------------------------------------------
.../end2end/TableSnapshotReadsMapReduceIT.java | 16 +++++-----------
1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/38915fee/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index 39d97a1..591f028 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -65,7 +65,6 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
private static List<List<Object>> result;
private String tableName;
-
private MyClock clock;
@Before
@@ -91,7 +90,7 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
// configure Phoenix M/R job to read snapshot
final Configuration conf = getUtility().getConfiguration();
Job job = Job.getInstance(conf);
- Path tmpDir = getUtility().getRandomDir();
+ Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
tmpDir, null, FIELD1, FIELD2, FIELD3);
@@ -111,7 +110,7 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
// configure Phoenix M/R job to read snapshot
final Configuration conf = getUtility().getConfiguration();
Job job = Job.getInstance(conf);
- Path tmpDir = getUtility().getRandomDir();
+ Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
@@ -131,7 +130,7 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
// configure Phoenix M/R job to read snapshot
final Configuration conf = getUtility().getConfiguration();
Job job = Job.getInstance(conf);
- Path tmpDir = getUtility().getRandomDir();
+ Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
// Running limit with order by on non pk column
String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
@@ -157,7 +156,6 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
// verify the result, should match the values at the corresponding timestamp
Properties props = new Properties();
props.setProperty("CurrentSCN", Long.toString(clock.time));
-
StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName);
if (condition != null) {
selectQuery.append(" WHERE " + condition);
@@ -180,7 +178,7 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
}
assertFalse(
- "Should only have stored" + result.size() + "rows in the table for the timestamp!",
+ "Should only have stored " + result.size() + "rows in the table for the timestamp!",
rs.next());
} finally {
deleteSnapshotAndTable(tableName);
@@ -242,10 +240,6 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
Connection conn = DriverManager.getConnection(getUrl());
HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
admin.deleteSnapshot(SNAPSHOT_NAME);
-
- conn.createStatement().execute("DROP TABLE " + tableName);
- conn.close();
-
}
public static class TableSnapshotMapper extends
@@ -263,4 +257,4 @@ public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
}
}
-}
\ No newline at end of file
+}