You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/09/22 05:43:52 UTC

[5/6] phoenix git commit: PHOENIX-3304 Tracing tests failing

PHOENIX-3304 Tracing tests failing


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5a448200
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5a448200
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5a448200

Branch: refs/heads/4.x-HBase-1.1
Commit: 5a448200204916c4dfcc87b180b9fcfd05b840e0
Parents: a92f304
Author: James Taylor <ja...@apache.org>
Authored: Tue Sep 20 22:39:19 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Wed Sep 21 22:45:03 2016 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/StatsCollectorIT.java       | 33 ++++------
 .../apache/phoenix/end2end/StoreNullsIT.java    | 68 ++------------------
 .../apache/phoenix/trace/BaseTracingTestIT.java | 14 ----
 .../phoenix/trace/PhoenixTracingEndToEndIT.java | 22 +++----
 .../apache/phoenix/trace/TracingTestUtil.java   |  8 ++-
 .../query/ConnectionQueryServicesImpl.java      |  1 +
 .../java/org/apache/phoenix/util/TestUtil.java  | 55 ++++++++++++++++
 7 files changed, 90 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index dd7741a..9a1ea26 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -40,8 +40,8 @@ import java.util.Random;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -77,8 +77,8 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
         props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
         props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString());
         props.put(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString());
-        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1024));
         props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true));
+        props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     
@@ -347,16 +347,8 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
         return stmt;
     }
 
-    private void compactTable(Connection conn, String tableName) throws IOException, InterruptedException, SQLException {
-        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
-        HBaseAdmin admin = services.getAdmin();
-        try {
-            admin.flush(tableName);
-            admin.majorCompact(tableName);
-            Thread.sleep(10000); // FIXME: how do we know when compaction is done?
-        } finally {
-            admin.close();
-        }
+    private void compactTable(Connection conn, String tableName) throws Exception {
+        TestUtil.doMajorCompaction(conn, tableName);
     }
     
     @Test
@@ -374,9 +366,6 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
         Connection conn;
         PreparedStatement stmt;
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        if (minStatsUpdateFreq != null) {
-            props.setProperty(QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB, minStatsUpdateFreq.toString());
-        }
         conn = DriverManager.getConnection(getUrl(), props);
         conn.createStatement().execute("CREATE TABLE " + tableName + "(k CHAR(1) PRIMARY KEY, v INTEGER, w INTEGER) "
                 + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE);
@@ -391,11 +380,11 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
         
         compactTable(conn, tableName);
         if (minStatsUpdateFreq == null) {
-            conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
-        }
-        // Confirm that when we have a non zero MIN_STATS_UPDATE_FREQ_MS_ATTRIB, after we run
-        // UPDATATE STATISTICS, the new statistics are faulted in as expected.
-        if (minStatsUpdateFreq != null) {
+            ImmutableBytesPtr ptr = new ImmutableBytesPtr(Bytes.toBytes(tableName));
+            conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(ptr);
+        } else {
+            // Confirm that when we have a non zero MIN_STATS_UPDATE_FREQ_MS_ATTRIB, after we run
+            // UPDATATE STATISTICS, the new statistics are faulted in as expected.
             List<KeyRange>keyRanges = getAllSplits(conn, tableName);
             assertNotEquals(nRows+1, keyRanges.size());
             // If we've set MIN_STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE STATISTICS will invalidate the cache
@@ -412,7 +401,8 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
         
         compactTable(conn, tableName);
         if (minStatsUpdateFreq == null) {
-            conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            ImmutableBytesPtr ptr = new ImmutableBytesPtr(Bytes.toBytes(tableName));
+            conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(ptr);
         }
         
         keyRanges = getAllSplits(conn, tableName);
@@ -429,7 +419,6 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
                 + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " WHERE PHYSICAL_NAME='" + tableName + "'");
         rs.next();
         assertEquals(nRows - nDeletedRows, rs.getLong(1));
-        
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
index 904743a..c14cf39 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
@@ -28,16 +28,11 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -46,12 +41,11 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-
 /**
  * Tests to demonstrate and verify the STORE_NULLS option on a table,
  * which allows explicitly storing null values (as opposed to using HBase Deletes) for nulls. This
@@ -132,7 +126,7 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
     }
 
     @Test
-    public void testQueryingHistory() throws SQLException, InterruptedException, IOException {
+    public void testQueryingHistory() throws Exception {
         stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, 'v1')");
         stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 'v1')");
 
@@ -144,8 +138,8 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
         stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, null)");
         Thread.sleep(10L);
 
-        doMajorCompaction(WITH_NULLS);
-        doMajorCompaction(WITHOUT_NULLS);
+        TestUtil.doMajorCompaction(conn, WITH_NULLS);
+        TestUtil.doMajorCompaction(conn, WITHOUT_NULLS);
 
         Properties historicalProps = new Properties();
         historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
@@ -171,7 +165,7 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
 
     // Row deletes should work in the same way regardless of what STORE_NULLS is set to
     @Test
-    public void testDeletes() throws SQLException, InterruptedException, IOException {
+    public void testDeletes() throws Exception {
         stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, 'v1')");
         stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 'v1')");
 
@@ -183,8 +177,8 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
         stmt.executeUpdate("DELETE FROM " + WITHOUT_NULLS + " WHERE id = 1");
         Thread.sleep(10L);
 
-        doMajorCompaction(WITH_NULLS);
-        doMajorCompaction(WITHOUT_NULLS);
+        TestUtil.doMajorCompaction(conn, WITH_NULLS);
+        TestUtil.doMajorCompaction(conn, WITHOUT_NULLS);
 
         Properties historicalProps = new Properties();
         historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
@@ -221,53 +215,5 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
         assertTrue(rs.getBoolean(1));
     }
 
-    /**
-     * Runs a major compaction, and then waits until the compaction is complete before returning.
-     *
-     * @param tableName name of the table to be compacted
-     */
-    private void doMajorCompaction(String tableName) throws IOException, InterruptedException {
-
-        tableName = SchemaUtil.normalizeIdentifier(tableName);
-
-        // We simply write a marker row, request a major compaction, and then wait until the marker
-        // row is gone
-        HTable htable = new HTable(getUtility().getConfiguration(), tableName);
-        byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
-
-
-        Put put = new Put(markerRowKey);
-        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, HConstants.EMPTY_BYTE_ARRAY,
-                HConstants.EMPTY_BYTE_ARRAY);
-        htable.put(put);
-        htable.delete(new Delete(markerRowKey));
-        htable.close();
-
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(getUtility().getConfiguration());
-        hbaseAdmin.flush(tableName);
-        hbaseAdmin.majorCompact(tableName);
-        hbaseAdmin.close();
-
-        boolean compactionDone = false;
-        while (!compactionDone) {
-            Thread.sleep(2000L);
-            htable = new HTable(getUtility().getConfiguration(), tableName);
-            Scan scan = new Scan();
-            scan.setStartRow(markerRowKey);
-            scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
-            scan.setRaw(true);
-
-            ResultScanner scanner = htable.getScanner(scan);
-            List<Result> results = Lists.newArrayList(scanner);
-            LOG.info("Results: " + results);
-            compactionDone = results.isEmpty();
-            scanner.close();
-
-            LOG.info("Compaction done: " + compactionDone);
-        }
-
-        htable.close();
-    }
-
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
index 7e5d17f..eed5618 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
@@ -38,13 +38,10 @@ import org.apache.hadoop.metrics2.impl.ExposedMetricsRecordImpl;
 import org.apache.hadoop.metrics2.lib.ExposedMetricsInfoImpl;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.metrics.MetricInfo;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.trace.util.Tracing.Frequency;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
-import org.junit.Before;
 
 /**
  * Base test for tracing tests - helps manage getting tracing/non-tracing
@@ -52,16 +49,6 @@ import org.junit.Before;
  */
 
 public class BaseTracingTestIT extends ParallelStatsDisabledIT {
-    @Before
-    public void resetTracingTableIfExists() throws Exception {
-        Connection conn = getConnectionWithoutTracing();
-        conn.setAutoCommit(true);
-        try {
-            conn.createStatement().executeUpdate(
-                    "DELETE FROM " + QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME);
-        } catch (TableNotFoundException ignore) {
-        }
-    }
 
     public static Connection getConnectionWithoutTracing() throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -70,7 +57,6 @@ public class BaseTracingTestIT extends ParallelStatsDisabledIT {
 
     public static Connection getConnectionWithoutTracing(Properties props) throws SQLException {
         Connection conn = getConnectionWithTracingFrequency(props, Frequency.NEVER);
-        conn.setAutoCommit(false);
         return conn;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
index d6ca23b..8097cc9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
@@ -64,25 +64,26 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
     private String enableForLoggingIndex;
 
     private DisableableMetricsWriter sink;
-    private String tableName;
+    private String tracingTableName;
 
     @Before
     public void setupMetrics() throws Exception {
         PhoenixMetricsSink pWriter = new PhoenixMetricsSink();
         Connection conn = getConnectionWithoutTracing();
-        tableName = generateRandomString();
-        pWriter.initForTesting(conn, tableName);
+        tracingTableName = "TRACING_" + generateRandomString();
+        pWriter.initForTesting(conn, tracingTableName);
         sink = new DisableableMetricsWriter(pWriter);
         enabledForLoggingTable = "ENABLED_FOR_LOGGING_" + generateRandomString();
-        enableForLoggingIndex = "ENABALED_FOR_LOGGING_INDEX" + generateRandomString();
+        enableForLoggingIndex = "ENABALED_FOR_LOGGING_INDEX_" + generateRandomString();
 
-        TracingTestUtil.registerSink(sink);
+        TracingTestUtil.registerSink(sink, tracingTableName);
     }
 
     @After
     public void cleanup() {
         sink.disable();
         sink.clear();
+        TracingTestUtil.unregisterSink(tracingTableName);
     }
 
     private void waitForCommit(CountDownLatch latch) throws SQLException {
@@ -95,7 +96,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         sink.disable();
 
         // swap the connection for one that listens
-        sink.getDelegate().initForTesting(conn, tableName);
+        sink.getDelegate().initForTesting(conn, tracingTableName);
 
         // enable the writer
         sink.enable();
@@ -223,7 +224,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
             public boolean foundTrace(TraceHolder trace, SpanInfo span) {
                 String traceInfo = trace.toString();
                 // skip logging traces that are just traces about tracing
-                if (traceInfo.contains(tableName)) {
+                if (traceInfo.contains(tracingTableName)) {
                     return false;
                 }
                 return traceInfo.contains("Completing index");
@@ -247,7 +248,6 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         // create an index on the table - we know indexing has some basic tracing
         ddl = "CREATE INDEX IF NOT EXISTS " + enableForLoggingIndex + " on " + enabledForLoggingTable + " (c1)";
         conn.createStatement().execute(ddl);
-        conn.commit();
     }
 
     @Test
@@ -321,14 +321,12 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         stmt.setLong(2, 1);
         stmt.execute();
         conn.commit();
-        conn.rollback();
 
         // setup for next set of updates
         stmt.setString(1, "key2");
         stmt.setLong(2, 2);
         stmt.execute();
         conn.commit();
-        conn.rollback();
 
         // do a scan of the table
         String read = "SELECT COUNT(*) FROM " + enabledForLoggingTable;
@@ -338,7 +336,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         assertEquals("Didn't get the expected number of row", 2, results.getInt(1));
         results.close();
 
-        assertTrue("Get expected updates to trace table", updated.await(200, TimeUnit.SECONDS));
+        assertTrue("Didn't get expected updates to trace table", updated.await(60, TimeUnit.SECONDS));
         // don't trace reads either
         boolean found = checkStoredTraces(conn, new TraceChecker() {
             @Override
@@ -461,7 +459,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
     }
 
     private boolean checkStoredTraces(Connection conn, TraceChecker checker) throws Exception {
-        TraceReader reader = new TraceReader(conn, tableName);
+        TraceReader reader = new TraceReader(conn, tracingTableName);
         int retries = 0;
         boolean found = false;
         outer: while (retries < MAX_RETRIES) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java
index b2b12f7..9c539c3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java
@@ -25,7 +25,11 @@ import org.apache.phoenix.metrics.Metrics;
  */
 public class TracingTestUtil {
 
-    public static void registerSink(MetricsSink sink){
-        Metrics.initialize().register("phoenix", "test sink gets logged", sink);
+    public static void registerSink(MetricsSink sink, String name){
+        Metrics.initialize().register(name, "test sink gets logged", sink);
+    }
+
+    public static void unregisterSink(String name){
+        Metrics.initialize().unregisterSource(name);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index cf6b00d..29bb885 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3073,6 +3073,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             HTableInterface htable = this.getTable(SchemaUtil
                     .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
             try {
+                tableStatsCache.invalidateAll();
                 final Map<byte[], Long> results =
                         htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
                                 HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, Long>() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a448200/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 5500e7a..50180d1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -49,9 +49,16 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -90,6 +97,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.LikeParseNode.LikeType;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -110,6 +118,8 @@ import com.google.common.collect.Lists;
 
 
 public class TestUtil {
+    private static final Log LOG = LogFactory.getLog(TestUtil.class);
+    
     public static final String DEFAULT_SCHEMA_NAME = "";
     public static final String DEFAULT_DATA_TABLE_NAME = "T";
     public static final String DEFAULT_INDEX_TABLE_NAME = "I";
@@ -713,5 +723,50 @@ public class TestUtil {
                 + (options!=null? options : "");
             conn.createStatement().execute(ddl);
     }
+
+    /**
+     * Runs a major compaction, and then waits until the compaction is complete before returning.
+     *
+     * @param tableName name of the table to be compacted
+     */
+    public static void doMajorCompaction(Connection conn, String tableName) throws Exception {
+    
+        tableName = SchemaUtil.normalizeIdentifier(tableName);
+    
+        // We simply write a marker row, request a major compaction, and then wait until the marker
+        // row is gone
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        try (HTableInterface htable = services.getTable(Bytes.toBytes(tableName))) {
+            byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
+        
+            Put put = new Put(markerRowKey);
+            put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, HConstants.EMPTY_BYTE_ARRAY,
+                    HConstants.EMPTY_BYTE_ARRAY);
+            htable.put(put);
+            htable.delete(new Delete(markerRowKey));
+        
+            HBaseAdmin hbaseAdmin = services.getAdmin();
+            hbaseAdmin.flush(tableName);
+            hbaseAdmin.majorCompact(tableName);
+            hbaseAdmin.close();
+        
+            boolean compactionDone = false;
+            while (!compactionDone) {
+                Thread.sleep(2000L);
+                Scan scan = new Scan();
+                scan.setStartRow(markerRowKey);
+                scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
+                scan.setRaw(true);
+        
+                ResultScanner scanner = htable.getScanner(scan);
+                List<Result> results = Lists.newArrayList(scanner);
+                LOG.info("Results: " + results);
+                compactionDone = results.isEmpty();
+                scanner.close();
+        
+                LOG.info("Compaction done: " + compactionDone);
+            }
+        }
+    }
 }