You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/09/22 05:43:52 UTC
[5/6] phoenix git commit: PHOENIX-3304 Tracing tests failing
PHOENIX-3304 Tracing tests failing
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5a448200
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5a448200
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5a448200
Branch: refs/heads/4.x-HBase-1.1
Commit: 5a448200204916c4dfcc87b180b9fcfd05b840e0
Parents: a92f304
Author: James Taylor <ja...@apache.org>
Authored: Tue Sep 20 22:39:19 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Wed Sep 21 22:45:03 2016 -0700
----------------------------------------------------------------------
.../phoenix/end2end/StatsCollectorIT.java | 33 ++++------
.../apache/phoenix/end2end/StoreNullsIT.java | 68 ++------------------
.../apache/phoenix/trace/BaseTracingTestIT.java | 14 ----
.../phoenix/trace/PhoenixTracingEndToEndIT.java | 22 +++----
.../apache/phoenix/trace/TracingTestUtil.java | 8 ++-
.../query/ConnectionQueryServicesImpl.java | 1 +
.../java/org/apache/phoenix/util/TestUtil.java | 55 ++++++++++++++++
7 files changed, 90 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index dd7741a..9a1ea26 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -40,8 +40,8 @@ import java.util.Random;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -77,8 +77,8 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString());
props.put(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString());
- props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1024));
props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true));
+ props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -347,16 +347,8 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
return stmt;
}
- private void compactTable(Connection conn, String tableName) throws IOException, InterruptedException, SQLException {
- ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
- HBaseAdmin admin = services.getAdmin();
- try {
- admin.flush(tableName);
- admin.majorCompact(tableName);
- Thread.sleep(10000); // FIXME: how do we know when compaction is done?
- } finally {
- admin.close();
- }
+ private void compactTable(Connection conn, String tableName) throws Exception {
+ TestUtil.doMajorCompaction(conn, tableName);
}
@Test
@@ -374,9 +366,6 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
Connection conn;
PreparedStatement stmt;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- if (minStatsUpdateFreq != null) {
- props.setProperty(QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB, minStatsUpdateFreq.toString());
- }
conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute("CREATE TABLE " + tableName + "(k CHAR(1) PRIMARY KEY, v INTEGER, w INTEGER) "
+ HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE);
@@ -391,11 +380,11 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
compactTable(conn, tableName);
if (minStatsUpdateFreq == null) {
- conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
- }
- // Confirm that when we have a non zero MIN_STATS_UPDATE_FREQ_MS_ATTRIB, after we run
- // UPDATATE STATISTICS, the new statistics are faulted in as expected.
- if (minStatsUpdateFreq != null) {
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr(Bytes.toBytes(tableName));
+ conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(ptr);
+ } else {
+ // Confirm that when we have a non zero MIN_STATS_UPDATE_FREQ_MS_ATTRIB, after we run
+ // UPDATATE STATISTICS, the new statistics are faulted in as expected.
List<KeyRange>keyRanges = getAllSplits(conn, tableName);
assertNotEquals(nRows+1, keyRanges.size());
// If we've set MIN_STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE STATISTICS will invalidate the cache
@@ -412,7 +401,8 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
compactTable(conn, tableName);
if (minStatsUpdateFreq == null) {
- conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr(Bytes.toBytes(tableName));
+ conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(ptr);
}
keyRanges = getAllSplits(conn, tableName);
@@ -429,7 +419,6 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
+ PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " WHERE PHYSICAL_NAME='" + tableName + "'");
rs.next();
assertEquals(nRows - nDeletedRows, rs.getLong(1));
-
}
@Test
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
index 904743a..c14cf39 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
@@ -28,16 +28,11 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -46,12 +41,11 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.Lists;
-
/**
* Tests to demonstrate and verify the STORE_NULLS option on a table,
* which allows explicitly storing null values (as opposed to using HBase Deletes) for nulls. This
@@ -132,7 +126,7 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
}
@Test
- public void testQueryingHistory() throws SQLException, InterruptedException, IOException {
+ public void testQueryingHistory() throws Exception {
stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, 'v1')");
stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 'v1')");
@@ -144,8 +138,8 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, null)");
Thread.sleep(10L);
- doMajorCompaction(WITH_NULLS);
- doMajorCompaction(WITHOUT_NULLS);
+ TestUtil.doMajorCompaction(conn, WITH_NULLS);
+ TestUtil.doMajorCompaction(conn, WITHOUT_NULLS);
Properties historicalProps = new Properties();
historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
@@ -171,7 +165,7 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
// Row deletes should work in the same way regardless of what STORE_NULLS is set to
@Test
- public void testDeletes() throws SQLException, InterruptedException, IOException {
+ public void testDeletes() throws Exception {
stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, 'v1')");
stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 'v1')");
@@ -183,8 +177,8 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
stmt.executeUpdate("DELETE FROM " + WITHOUT_NULLS + " WHERE id = 1");
Thread.sleep(10L);
- doMajorCompaction(WITH_NULLS);
- doMajorCompaction(WITHOUT_NULLS);
+ TestUtil.doMajorCompaction(conn, WITH_NULLS);
+ TestUtil.doMajorCompaction(conn, WITHOUT_NULLS);
Properties historicalProps = new Properties();
historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
@@ -221,53 +215,5 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
assertTrue(rs.getBoolean(1));
}
- /**
- * Runs a major compaction, and then waits until the compaction is complete before returning.
- *
- * @param tableName name of the table to be compacted
- */
- private void doMajorCompaction(String tableName) throws IOException, InterruptedException {
-
- tableName = SchemaUtil.normalizeIdentifier(tableName);
-
- // We simply write a marker row, request a major compaction, and then wait until the marker
- // row is gone
- HTable htable = new HTable(getUtility().getConfiguration(), tableName);
- byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
-
-
- Put put = new Put(markerRowKey);
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY);
- htable.put(put);
- htable.delete(new Delete(markerRowKey));
- htable.close();
-
- HBaseAdmin hbaseAdmin = new HBaseAdmin(getUtility().getConfiguration());
- hbaseAdmin.flush(tableName);
- hbaseAdmin.majorCompact(tableName);
- hbaseAdmin.close();
-
- boolean compactionDone = false;
- while (!compactionDone) {
- Thread.sleep(2000L);
- htable = new HTable(getUtility().getConfiguration(), tableName);
- Scan scan = new Scan();
- scan.setStartRow(markerRowKey);
- scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
- scan.setRaw(true);
-
- ResultScanner scanner = htable.getScanner(scan);
- List<Result> results = Lists.newArrayList(scanner);
- LOG.info("Results: " + results);
- compactionDone = results.isEmpty();
- scanner.close();
-
- LOG.info("Compaction done: " + compactionDone);
- }
-
- htable.close();
- }
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
index 7e5d17f..eed5618 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
@@ -38,13 +38,10 @@ import org.apache.hadoop.metrics2.impl.ExposedMetricsRecordImpl;
import org.apache.hadoop.metrics2.lib.ExposedMetricsInfoImpl;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.metrics.MetricInfo;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.trace.util.Tracing.Frequency;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
-import org.junit.Before;
/**
* Base test for tracing tests - helps manage getting tracing/non-tracing
@@ -52,16 +49,6 @@ import org.junit.Before;
*/
public class BaseTracingTestIT extends ParallelStatsDisabledIT {
- @Before
- public void resetTracingTableIfExists() throws Exception {
- Connection conn = getConnectionWithoutTracing();
- conn.setAutoCommit(true);
- try {
- conn.createStatement().executeUpdate(
- "DELETE FROM " + QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME);
- } catch (TableNotFoundException ignore) {
- }
- }
public static Connection getConnectionWithoutTracing() throws SQLException {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -70,7 +57,6 @@ public class BaseTracingTestIT extends ParallelStatsDisabledIT {
public static Connection getConnectionWithoutTracing(Properties props) throws SQLException {
Connection conn = getConnectionWithTracingFrequency(props, Frequency.NEVER);
- conn.setAutoCommit(false);
return conn;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
index d6ca23b..8097cc9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
@@ -64,25 +64,26 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
private String enableForLoggingIndex;
private DisableableMetricsWriter sink;
- private String tableName;
+ private String tracingTableName;
@Before
public void setupMetrics() throws Exception {
PhoenixMetricsSink pWriter = new PhoenixMetricsSink();
Connection conn = getConnectionWithoutTracing();
- tableName = generateRandomString();
- pWriter.initForTesting(conn, tableName);
+ tracingTableName = "TRACING_" + generateRandomString();
+ pWriter.initForTesting(conn, tracingTableName);
sink = new DisableableMetricsWriter(pWriter);
enabledForLoggingTable = "ENABLED_FOR_LOGGING_" + generateRandomString();
- enableForLoggingIndex = "ENABALED_FOR_LOGGING_INDEX" + generateRandomString();
+ enableForLoggingIndex = "ENABALED_FOR_LOGGING_INDEX_" + generateRandomString();
- TracingTestUtil.registerSink(sink);
+ TracingTestUtil.registerSink(sink, tracingTableName);
}
@After
public void cleanup() {
sink.disable();
sink.clear();
+ TracingTestUtil.unregisterSink(tracingTableName);
}
private void waitForCommit(CountDownLatch latch) throws SQLException {
@@ -95,7 +96,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
sink.disable();
// swap the connection for one that listens
- sink.getDelegate().initForTesting(conn, tableName);
+ sink.getDelegate().initForTesting(conn, tracingTableName);
// enable the writer
sink.enable();
@@ -223,7 +224,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
public boolean foundTrace(TraceHolder trace, SpanInfo span) {
String traceInfo = trace.toString();
// skip logging traces that are just traces about tracing
- if (traceInfo.contains(tableName)) {
+ if (traceInfo.contains(tracingTableName)) {
return false;
}
return traceInfo.contains("Completing index");
@@ -247,7 +248,6 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
// create an index on the table - we know indexing has some basic tracing
ddl = "CREATE INDEX IF NOT EXISTS " + enableForLoggingIndex + " on " + enabledForLoggingTable + " (c1)";
conn.createStatement().execute(ddl);
- conn.commit();
}
@Test
@@ -321,14 +321,12 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
stmt.setLong(2, 1);
stmt.execute();
conn.commit();
- conn.rollback();
// setup for next set of updates
stmt.setString(1, "key2");
stmt.setLong(2, 2);
stmt.execute();
conn.commit();
- conn.rollback();
// do a scan of the table
String read = "SELECT COUNT(*) FROM " + enabledForLoggingTable;
@@ -338,7 +336,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
assertEquals("Didn't get the expected number of row", 2, results.getInt(1));
results.close();
- assertTrue("Get expected updates to trace table", updated.await(200, TimeUnit.SECONDS));
+ assertTrue("Didn't get expected updates to trace table", updated.await(60, TimeUnit.SECONDS));
// don't trace reads either
boolean found = checkStoredTraces(conn, new TraceChecker() {
@Override
@@ -461,7 +459,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
}
private boolean checkStoredTraces(Connection conn, TraceChecker checker) throws Exception {
- TraceReader reader = new TraceReader(conn, tableName);
+ TraceReader reader = new TraceReader(conn, tracingTableName);
int retries = 0;
boolean found = false;
outer: while (retries < MAX_RETRIES) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java
index b2b12f7..9c539c3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java
@@ -25,7 +25,11 @@ import org.apache.phoenix.metrics.Metrics;
*/
public class TracingTestUtil {
- public static void registerSink(MetricsSink sink){
- Metrics.initialize().register("phoenix", "test sink gets logged", sink);
+ public static void registerSink(MetricsSink sink, String name){
+ Metrics.initialize().register(name, "test sink gets logged", sink);
+ }
+
+ public static void unregisterSink(String name){
+ Metrics.initialize().unregisterSource(name);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index cf6b00d..29bb885 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3073,6 +3073,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
HTableInterface htable = this.getTable(SchemaUtil
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
try {
+ tableStatsCache.invalidateAll();
final Map<byte[], Long> results =
htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, Long>() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 5500e7a..50180d1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -49,9 +49,16 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -90,6 +97,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.LikeParseNode.LikeType;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
@@ -110,6 +118,8 @@ import com.google.common.collect.Lists;
public class TestUtil {
+ private static final Log LOG = LogFactory.getLog(TestUtil.class);
+
public static final String DEFAULT_SCHEMA_NAME = "";
public static final String DEFAULT_DATA_TABLE_NAME = "T";
public static final String DEFAULT_INDEX_TABLE_NAME = "I";
@@ -713,5 +723,50 @@ public class TestUtil {
+ (options!=null? options : "");
conn.createStatement().execute(ddl);
}
+
+ /**
+ * Runs a major compaction, and then waits until the compaction is complete before returning.
+ *
+ * @param tableName name of the table to be compacted
+ */
+ public static void doMajorCompaction(Connection conn, String tableName) throws Exception {
+
+ tableName = SchemaUtil.normalizeIdentifier(tableName);
+
+ // We simply write a marker row, request a major compaction, and then wait until the marker
+ // row is gone
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ try (HTableInterface htable = services.getTable(Bytes.toBytes(tableName))) {
+ byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
+
+ Put put = new Put(markerRowKey);
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY);
+ htable.put(put);
+ htable.delete(new Delete(markerRowKey));
+
+ HBaseAdmin hbaseAdmin = services.getAdmin();
+ hbaseAdmin.flush(tableName);
+ hbaseAdmin.majorCompact(tableName);
+ hbaseAdmin.close();
+
+ boolean compactionDone = false;
+ while (!compactionDone) {
+ Thread.sleep(2000L);
+ Scan scan = new Scan();
+ scan.setStartRow(markerRowKey);
+ scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
+ scan.setRaw(true);
+
+ ResultScanner scanner = htable.getScanner(scan);
+ List<Result> results = Lists.newArrayList(scanner);
+ LOG.info("Results: " + results);
+ compactionDone = results.isEmpty();
+ scanner.close();
+
+ LOG.info("Compaction done: " + compactionDone);
+ }
+ }
+ }
}