You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/28 20:44:46 UTC
[2/2] git commit: PHOENIX-1390 Stats not updated on client after
major compaction
PHOENIX-1390 Stats not updated on client after major compaction
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7a8a023a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7a8a023a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7a8a023a
Branch: refs/heads/4.0
Commit: 7a8a023a3d2bdb694f02fc1560a0f5eb35294a96
Parents: 851f57a
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Oct 28 12:44:37 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Oct 28 12:44:37 2014 -0700
----------------------------------------------------------------------
.../end2end/BaseClientManagedTimeIT.java | 15 +++-
.../org/apache/phoenix/end2end/BaseQueryIT.java | 3 +-
.../end2end/ClientTimeArithmeticQueryIT.java | 43 +++++++++++
.../phoenix/end2end/InMemoryOrderByIT.java | 4 +-
.../org/apache/phoenix/end2end/QueryIT.java | 24 ++++--
.../apache/phoenix/end2end/ReverseScanIT.java | 2 +-
.../org/apache/phoenix/end2end/SequenceIT.java | 7 +-
.../phoenix/end2end/SpooledOrderByIT.java | 4 +-
.../phoenix/end2end/StatsCollectorIT.java | 55 +++++++++++++-
.../apache/phoenix/end2end/UpsertSelectIT.java | 4 +-
.../phoenix/compile/ExpressionCompiler.java | 2 +-
.../coprocessor/MetaDataEndpointImpl.java | 35 ++-------
.../UngroupedAggregateRegionObserver.java | 21 ++++--
.../org/apache/phoenix/query/QueryServices.java | 1 +
.../phoenix/query/QueryServicesOptions.java | 5 +-
.../apache/phoenix/schema/MetaDataClient.java | 4 +-
.../org/apache/phoenix/schema/PTableImpl.java | 2 +-
.../phoenix/schema/stats/PTableStats.java | 7 ++
.../phoenix/schema/stats/PTableStatsImpl.java | 12 ++-
.../schema/stats/StatisticsCollector.java | 79 ++++++++++++--------
.../phoenix/schema/stats/StatisticsScanner.java | 1 -
.../phoenix/schema/stats/StatisticsUtil.java | 6 +-
.../phoenix/schema/stats/StatisticsWriter.java | 39 ++++++----
23 files changed, 259 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
index 14dffcb..1acd5b3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
@@ -17,16 +17,21 @@
*/
package org.apache.phoenix.end2end;
+import java.util.Map;
+
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
/**
* Base class for tests that manage their own time stamps
* We need to separate these from tests that rely on hbase to set
@@ -54,9 +59,17 @@ public abstract class BaseClientManagedTimeIT extends BaseTest {
deletePriorTables(ts - 1, getUrl());
}
+ public static Map<String,String> getDefaultProps() {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
+ // Must update config before starting server
+ props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, Boolean.FALSE.toString());
+ return props;
+ }
+
@BeforeClass
public static void doSetup() throws Exception {
- setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
+ Map<String,String> props = getDefaultProps();
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
index 7a3e86e..f3031f4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
@@ -48,7 +48,6 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
@@ -70,7 +69,7 @@ public abstract class BaseQueryIT extends BaseClientManagedTimeIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
+ Map<String,String> props = getDefaultProps();
props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
// Make a small batch size to test multiple calls to reserve sequences
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
index 98b233c..d709b9c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
@@ -49,6 +49,7 @@ import java.util.Properties;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -596,5 +597,47 @@ public class ClientTimeArithmeticQueryIT extends BaseQueryIT {
}
}
+ @Test
+ public void testDateDateSubtract() throws Exception {
+ String url;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15);
+ Connection conn = DriverManager.getConnection(url, props);
+ PreparedStatement statement = conn.prepareStatement("UPSERT INTO ATABLE(organization_id,entity_id,a_time) VALUES(?,?,?)");
+ statement.setString(1, getOrganizationId());
+ statement.setString(2, ROW2);
+ statement.setDate(3, date);
+ statement.execute();
+ statement.setString(2, ROW3);
+ statement.setDate(3, date);
+ statement.execute();
+ statement.setString(2, ROW4);
+ statement.setDate(3, new Date(date.getTime() + TestUtil.MILLIS_IN_DAY - 1));
+ statement.execute();
+ statement.setString(2, ROW6);
+ statement.setDate(3, new Date(date.getTime() + TestUtil.MILLIS_IN_DAY - 1));
+ statement.execute();
+ statement.setString(2, ROW9);
+ statement.setDate(3, date);
+ statement.execute();
+ conn.commit();
+ conn.close();
+
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 25);
+ conn = DriverManager.getConnection(url, props);
+ try {
+ statement = conn.prepareStatement("SELECT entity_id, b_string FROM ATABLE WHERE a_date - a_time > 1");
+ ResultSet rs = statement.executeQuery();
+ @SuppressWarnings("unchecked")
+ List<List<Object>> expectedResults = Lists.newArrayList(
+ Arrays.<Object>asList(ROW3, E_VALUE),
+ Arrays.<Object>asList( ROW6, E_VALUE),
+ Arrays.<Object>asList(ROW9, E_VALUE));
+ assertValuesEqualsResultSet(rs, expectedResults);
+ } finally {
+ conn.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
index 48a0581..533143c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
@@ -24,8 +24,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
-import com.google.common.collect.Maps;
-
@Category(ClientManagedTimeTest.class)
public class InMemoryOrderByIT extends OrderByIT {
@@ -35,7 +33,7 @@ public class InMemoryOrderByIT extends OrderByIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ Map<String,String> props = getDefaultProps();
props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1024*1024));
// Must update config before starting server
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
index f45b689..fe65e10 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
@@ -250,7 +250,7 @@ public class QueryIT extends BaseQueryIT {
@Test
public void testPointInTimeScan() throws Exception {
// Override value that was set at creation time
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1); // Run query at timestamp 5
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 10);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection upsertConn = DriverManager.getConnection(url, props);
String upsertStmt =
@@ -267,13 +267,15 @@ public class QueryIT extends BaseQueryIT {
stmt.setString(2, ROW4);
stmt.setInt(3, 5);
stmt.execute(); // should commit too
- Connection conn1 = DriverManager.getConnection(getUrl(), props);
+
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15);
+ Connection conn1 = DriverManager.getConnection(url, props);
analyzeTable(conn1, "ATABLE");
conn1.close();
upsertConn.close();
// Override value again, but should be ignored since it's past the SCN
- url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 3); // Run query at timestamp 5
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 30);
upsertConn = DriverManager.getConnection(url, props);
upsertConn.setAutoCommit(true); // Test auto commit
// Insert all rows at ts
@@ -285,7 +287,7 @@ public class QueryIT extends BaseQueryIT {
upsertConn.close();
String query = "SELECT organization_id, a_string AS a FROM atable WHERE organization_id=? and a_integer = 5";
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2));
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 20));
Connection conn = DriverManager.getConnection(getUrl(), props);
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, tenantId);
@@ -394,7 +396,7 @@ public class QueryIT extends BaseQueryIT {
" A_TIMESTAMP) " +
"VALUES (?, ?, ?)";
// Override value that was set at creation time
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1);
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 10);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection upsertConn = DriverManager.getConnection(url, props);
upsertConn.setAutoCommit(true); // Test auto commit
@@ -405,9 +407,12 @@ public class QueryIT extends BaseQueryIT {
byte[] ts1 = PDataType.TIMESTAMP.toBytes(tsValue1);
stmt.setTimestamp(3, tsValue1);
stmt.execute();
- Connection conn1 = DriverManager.getConnection(getUrl(), props);
+
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15);
+ Connection conn1 = DriverManager.getConnection(url, props);
analyzeTable(conn1, "ATABLE");
conn1.close();
+
updateStmt =
"upsert into " +
"ATABLE(" +
@@ -426,15 +431,18 @@ public class QueryIT extends BaseQueryIT {
stmt.setTime(4, new Time(tsValue2.getTime()));
stmt.execute();
upsertConn.close();
- conn1 = DriverManager.getConnection(getUrl(), props);
+
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 20);
+ conn1 = DriverManager.getConnection(url, props);
analyzeTable(conn1, "ATABLE");
conn1.close();
+
analyzeTable(upsertConn, "ATABLE");
assertTrue(compare(CompareOp.GREATER, new ImmutableBytesWritable(ts2), new ImmutableBytesWritable(ts1)));
assertFalse(compare(CompareOp.GREATER, new ImmutableBytesWritable(ts1), new ImmutableBytesWritable(ts1)));
String query = "SELECT entity_id, a_timestamp, a_time FROM aTable WHERE organization_id=? and a_timestamp > ?";
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 3)); // Execute at timestamp 2
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30)); // Execute at timestamp 2
Connection conn = DriverManager.getConnection(getUrl(), props);
try {
PreparedStatement statement = conn.prepareStatement(query);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
index 26d6d4b..e279710 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
@@ -50,7 +50,7 @@ import com.google.common.collect.Maps;
@Category(HBaseManagedTimeTest.class)
public class ReverseScanIT extends BaseHBaseManagedTimeIT {
@BeforeClass
- @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+ @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
public static void doSetup() throws Exception {
Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
index 4f2b9a9..b4b0b2e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
@@ -51,7 +51,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
@Category(ClientManagedTimeTest.class)
public class SequenceIT extends BaseClientManagedTimeIT {
@@ -63,11 +62,9 @@ public class SequenceIT extends BaseClientManagedTimeIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
-
- Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
- // Make a small batch size to test multiple calls to reserve sequences
- props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE));
+ Map<String,String> props = getDefaultProps();
// Must update config before starting server
+ props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
index 2533a29..c35ecab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
@@ -24,15 +24,13 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
-import com.google.common.collect.Maps;
-
@Category(ClientManagedTimeTest.class)
public class SpooledOrderByIT extends OrderByIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ Map<String,String> props = getDefaultProps();
props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(100));
// Must update config before starting server
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index 51ad543..b9a0e88 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.end2end;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getAllSplits;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -27,9 +29,15 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -41,7 +49,8 @@ import com.google.common.collect.Maps;
@Category(NeedsOwnMiniClusterTest.class)
public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
-
+ private static final String STATS_TEST_TABLE_NAME = "S";
+
@BeforeClass
public static void doSetup() throws Exception {
Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
@@ -222,4 +231,48 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
return stmt;
}
+ private void compactTable(Connection conn) throws IOException, InterruptedException, SQLException {
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ HBaseAdmin admin = services.getAdmin();
+ try {
+ admin.flush(STATS_TEST_TABLE_NAME);
+ admin.majorCompact(STATS_TEST_TABLE_NAME);
+ Thread.sleep(10000); // FIXME: how do we know when compaction is done?
+ } finally {
+ admin.close();
+ }
+ services.clearCache();
+ }
+
+ @Test
+ public void testCompactUpdatesStats() throws Exception {
+ int nRows = 10;
+ Connection conn;
+ PreparedStatement stmt;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ conn = DriverManager.getConnection(getUrl(), props);
+ conn.createStatement().execute("CREATE TABLE " + STATS_TEST_TABLE_NAME + "(k CHAR(1) PRIMARY KEY, v INTEGER) " + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE);
+ stmt = conn.prepareStatement("UPSERT INTO " + STATS_TEST_TABLE_NAME + " VALUES(?,?)");
+ for (int i = 0; i < nRows; i++) {
+ stmt.setString(1, Character.toString((char) ('a' + i)));
+ stmt.setInt(2, i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+
+ compactTable(conn);
+ conn = DriverManager.getConnection(getUrl(), props);
+ List<KeyRange>keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME);
+ assertEquals(nRows+1, keyRanges.size());
+
+ int nDeletedRows = conn.createStatement().executeUpdate("DELETE FROM " + STATS_TEST_TABLE_NAME + " WHERE V < 5");
+ conn.commit();
+ assertEquals(5, nDeletedRows);
+
+ compactTable(conn);
+
+ keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME);
+ assertEquals(nRows/2+1, keyRanges.size());
+
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 642ba62..ac54fe4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -55,8 +55,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import com.google.common.collect.Maps;
-
@Category(ClientManagedTimeTest.class)
public class UpsertSelectIT extends BaseClientManagedTimeIT {
@@ -64,7 +62,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
+ Map<String,String> props = getDefaultProps();
props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(500));
props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
index e06a88f..3876b8a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
@@ -891,7 +891,7 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio
if (isType1Date || isType2Date) {
if (isType1Date && isType2Date) {
i = 2;
- theType = PDataType.LONG;
+ theType = PDataType.DECIMAL;
} else if (isType1Date && type2 != null
&& type2.isCoercibleTo(PDataType.DECIMAL)) {
i = 2;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index b90fb2e..3abd206 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -608,6 +608,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
try {
statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp);
+ timeStamp = Math.max(timeStamp, stats.getTimestamp());
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
logger.warn(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " not online yet?");
} finally {
@@ -1264,32 +1265,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
- private PTable incrementTableTimestamp(byte[] key, long clientTimeStamp) throws IOException, SQLException {
- HRegion region = env.getRegion();
- RowLock lid = region.getRowLock(key);
- if (lid == null) {
- throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
- }
- try {
- PTable table = doGetTable(key, clientTimeStamp, lid);
- if (table != null) {
- long tableTimeStamp = table.getTimeStamp() + 1;
- List<Mutation> mutations = Lists.newArrayListWithExpectedSize(1);
- Put p = new Put(key);
- p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, tableTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
- mutations.add(p);
- region.mutateRowsWithLocks(mutations, Collections.<byte[]> emptySet());
-
- Cache<ImmutableBytesPtr, PTable> metaDataCache = GlobalCache.getInstance(env).getMetaDataCache();
- ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
- metaDataCache.invalidate(cacheKey);
- }
- return table;
- } finally {
- lid.release();
- }
- }
-
private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
return doGetTable(key, clientTimeStamp, null);
}
@@ -1711,9 +1686,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] tableName = request.getTableName().toByteArray();
try {
byte[] tenantId = request.getTenantId().toByteArray();
- long clientTimeStamp = request.getClientTimestamp();
- byte[] tableKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
- incrementTableTimestamp(tableKey, clientTimeStamp);
+ byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+ ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+ Cache<ImmutableBytesPtr, PTable> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
+ metaDataCache.invalidate(cacheKey);
} catch (Throwable t) {
logger.error("incrementTableTimeStamp failed", t);
ProtobufUtil.setControllerException(controller,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 710409f..aba35fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -72,6 +72,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ConstraintViolationException;
import org.apache.phoenix.schema.PColumn;
@@ -459,9 +460,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
&& scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
try {
- // TODO: for users that manage timestamps themselves, we should provide
- // a means of specifying/getting this.
- long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
+ boolean useCurrentTime =
+ c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+ // Provides a means of clients controlling their timestamps to not use current time
+ // when background tasks are updating stats. Instead we track the max timestamp of
+ // the cells and use that.
+ long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP;
StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table.getNameAsString(), clientTimeStamp);
internalScan =
stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s);
@@ -485,9 +490,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
StatisticsCollector stats = null;
try {
- // TODO: for users that manage timestamps themselves, we should provide
- // a means of specifying/getting this.
- long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
+ boolean useCurrentTime =
+ e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+ // Provides a means of clients controlling their timestamps to not use current time
+ // when background tasks are updating stats. Instead we track the max timestamp of
+ // the cells and use that.
+ long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP;
stats = new StatisticsCollector(e.getEnvironment(), table.getNameAsString(), clientTimeStamp);
stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region);
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 7f000c0..72002ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -136,6 +136,7 @@ public interface QueryServices extends SQLCloseable {
public static final String MIN_STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.stats.minUpdateFrequency";
public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width";
public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region";
+ public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime";
public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets";
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 7ee225b..7c8ecd4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -52,6 +52,7 @@ import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTRIB;
import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
@@ -146,7 +147,8 @@ public class QueryServicesOptions {
public static final double DEFAULT_TRACING_PROBABILITY_THRESHOLD = 0.05;
public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min
- public static final int DEFAULT_GUIDE_POSTS_PER_REGION = 20;
+ public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 100 * 1024 *1024; // 100MB
+ public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true;
public static final boolean DEFAULT_USE_REVERSE_SCAN = true;
@@ -175,6 +177,7 @@ public class QueryServicesOptions {
public static QueryServicesOptions withDefaults() {
Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
QueryServicesOptions options = new QueryServicesOptions(config)
+ .setIfUnset(STATS_USE_CURRENT_TIME_ATTRIB, DEFAULT_STATS_USE_CURRENT_TIME)
.setIfUnset(KEEP_ALIVE_MS_ATTRIB, DEFAULT_KEEP_ALIVE_MS)
.setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE)
.setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index afe21e8..b763bbb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -613,13 +613,13 @@ public class MetaDataClient {
Long scn = connection.getSCN();
// Always invalidate the cache
long clientTimeStamp = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn;
- String query = "SELECT CURRENT_DATE() - " + LAST_STATS_UPDATE_TIME + " FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME
+ String query = "SELECT CURRENT_DATE()," + LAST_STATS_UPDATE_TIME + " FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME
+ " WHERE " + PHYSICAL_NAME + "='" + physicalName.getString() + "' AND " + COLUMN_FAMILY
+ " IS NULL AND " + REGION_NAME + " IS NULL AND " + LAST_STATS_UPDATE_TIME + " IS NOT NULL";
ResultSet rs = connection.createStatement().executeQuery(query);
long msSinceLastUpdate = Long.MAX_VALUE;
if (rs.next()) {
- msSinceLastUpdate = rs.getLong(1);
+ msSinceLastUpdate = rs.getLong(1) - rs.getLong(2);
}
if (msSinceLastUpdate < msMinBetweenUpdates) {
return 0;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 2448f39..8f85ccc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -886,7 +886,7 @@ public class PTableImpl implements PTable {
GuidePostsInfo info = new GuidePostsInfo(pTableStatsProto.getGuidePostsByteCount(), value);
tableGuidePosts.put(pTableStatsProto.getKey().toByteArray(), info);
}
- PTableStats stats = new PTableStatsImpl(tableGuidePosts);
+ PTableStats stats = new PTableStatsImpl(tableGuidePosts, timeStamp);
PName dataTableName = null;
if (table.hasDataTableNameBytes()) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
index 3745487..435fe87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
@@ -37,6 +37,11 @@ public interface PTableStats {
public int getEstimatedSize() {
return 0;
}
+
+ @Override
+ public long getTimestamp() {
+ return StatisticsCollector.NO_TIMESTAMP;
+ }
};
/**
@@ -47,4 +52,6 @@ public interface PTableStats {
SortedMap<byte[], GuidePostsInfo> getGuidePosts();
int getEstimatedSize();
+
+ long getTimestamp();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
index dcf7b00..dc70e86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
@@ -23,6 +23,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.util.SizedUtil;
import com.sun.istack.NotNull;
@@ -33,13 +34,15 @@ import com.sun.istack.NotNull;
public class PTableStatsImpl implements PTableStats {
private final SortedMap<byte[], GuidePostsInfo> guidePosts;
private final int estimatedSize;
+ private final long timeStamp;
public PTableStatsImpl() {
- this(new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR));
+ this(new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR), MetaDataProtocol.MIN_TABLE_TIMESTAMP);
}
- public PTableStatsImpl(@NotNull SortedMap<byte[], GuidePostsInfo> guidePosts) {
+ public PTableStatsImpl(@NotNull SortedMap<byte[], GuidePostsInfo> guidePosts, long timeStamp) {
this.guidePosts = guidePosts;
+ this.timeStamp = timeStamp;
int estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.INT_SIZE + SizedUtil.sizeOfTreeMap(guidePosts.size());
for (Map.Entry<byte[], GuidePostsInfo> entry : guidePosts.entrySet()) {
byte[] cf = entry.getKey();
@@ -84,4 +87,9 @@ public class PTableStatsImpl implements PTableStats {
public int getEstimatedSize() {
return estimatedSize;
}
+
+ @Override
+ public long getTimestamp() {
+ return timeStamp;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 53bd18a..3511d12 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -23,8 +23,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
@@ -45,12 +43,15 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.TimeKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -64,33 +65,45 @@ import com.google.common.collect.Maps;
* board for now.
*/
public class StatisticsCollector {
+ private static final Logger logger = LoggerFactory.getLogger(StatisticsCollector.class);
+ public static final long NO_TIMESTAMP = -1;
private Map<String, byte[]> minMap = Maps.newHashMap();
private Map<String, byte[]> maxMap = Maps.newHashMap();
private long guidepostDepth;
+ private boolean useCurrentTime;
+ private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
private Map<String, Pair<Long,GuidePostsInfo>> guidePostsMap = Maps.newHashMap();
// Tracks the bytecount per family if it has reached the guidePostsDepth
private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
protected StatisticsWriter statsTable;
- // Ensures that either analyze or compaction happens at any point of time.
- private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException {
Configuration config = env.getConfiguration();
HTableInterface statsHTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
- long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize();
- if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set...
- maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
+ useCurrentTime =
+ config.getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+ int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 0);
+ if (guidepostPerRegion > 0) {
+ long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize();
+ if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set...
+ maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
+ }
+ guidepostDepth = maxFileSize / guidepostPerRegion;
+ } else {
+ guidepostDepth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
}
- guidepostDepth =
- config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
- maxFileSize / config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
- QueryServicesOptions.DEFAULT_GUIDE_POSTS_PER_REGION));
// Get the stats table associated with the current table on which the CP is
// triggered
this.statsTable = StatisticsWriter.newWriter(statsHTable, tableName, clientTimeStamp);
}
+ public long getMaxTimeStamp() {
+ return maxTimeStamp;
+ }
+
public void close() throws IOException {
this.statsTable.close();
}
@@ -99,12 +112,12 @@ public class StatisticsCollector {
try {
ArrayList<Mutation> mutations = new ArrayList<Mutation>();
writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing new stats for the region " + region.getRegionInfo());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Committing new stats for the region " + region.getRegionInfo());
}
commitStats(mutations);
} catch (IOException e) {
- LOG.error(e);
+ logger.error("Unable to commit new stats", e);
} finally {
clear();
}
@@ -116,20 +129,20 @@ public class StatisticsCollector {
// update the statistics table
for (ImmutableBytesPtr fam : familyMap.keySet()) {
if (delete) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Deleting the stats for the region "+region.getRegionInfo());
+ if(logger.isDebugEnabled()) {
+ logger.debug("Deleting the stats for the region "+region.getRegionInfo());
}
statsTable.deleteStats(region.getRegionInfo().getRegionNameAsString(), this, Bytes.toString(fam.copyBytesIfNecessary()),
mutations);
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding new stats for the region "+region.getRegionInfo());
+ if(logger.isDebugEnabled()) {
+ logger.debug("Adding new stats for the region "+region.getRegionInfo());
}
statsTable.addStats((region.getRegionInfo().getRegionNameAsString()), this, Bytes.toString(fam.copyBytesIfNecessary()),
mutations);
}
} catch (IOException e) {
- LOG.error("Failed to update statistics table!", e);
+ logger.error("Failed to update statistics table!", e);
throw e;
}
}
@@ -147,7 +160,7 @@ public class StatisticsCollector {
mutations);
}
} catch (IOException e) {
- LOG.error("Failed to delete from statistics table!", e);
+ logger.error("Failed to delete from statistics table!", e);
throw e;
}
}
@@ -195,8 +208,8 @@ public class StatisticsCollector {
internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
smallestReadPoint, earliestPutTs);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compaction scanner created for stats");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Compaction scanner created for stats");
}
InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName());
if (scanner != null) {
@@ -212,22 +225,22 @@ public class StatisticsCollector {
// Create a delete operation on the parent region
// Then write the new guide posts for individual regions
List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
- long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+ long currentTime = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : -1;
deleteStatsFromStatsTable(region, mutations, currentTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Collecting stats for the daughter region " + l.getRegionInfo());
}
collectStatsForSplitRegions(conf, l, mutations, currentTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Collecting stats for the daughter region " + r.getRegionInfo());
}
collectStatsForSplitRegions(conf, r, mutations, currentTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
}
commitStats(mutations);
} catch (IOException e) {
- LOG.error("Error while capturing stats after split of region "
+ logger.error("Error while capturing stats after split of region "
+ region.getRegionInfo().getRegionNameAsString(), e);
}
}
@@ -244,13 +257,13 @@ public class StatisticsCollector {
count = scanRegion(scanner, count);
writeStatsToStatsTable(daughter, false, mutations, currentTime);
} catch (IOException e) {
- LOG.error(e);
+ logger.error("Unable to collects stats during split", e);
toThrow = e;
} finally {
try {
if (scanner != null) scanner.close();
} catch (IOException e) {
- LOG.error(e);
+ logger.error("Unable to close scanner after split", e);
if (toThrow != null) toThrow = e;
} finally {
if (toThrow != null) throw toThrow;
@@ -278,6 +291,7 @@ public class StatisticsCollector {
this.minMap.clear();
this.guidePostsMap.clear();
this.familyMap.clear();
+ maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
}
public void updateStatistic(KeyValue kv) {
@@ -302,6 +316,7 @@ public class StatisticsCollector {
maxMap.put(fam, row);
}
}
+ maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp());
// TODO : This can be moved to an interface so that we could collect guide posts in different ways
Pair<Long,GuidePostsInfo> gps = guidePostsMap.get(fam);
if (gps == null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 60b9601..3a84cfc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -35,7 +35,6 @@ public class StatisticsScanner implements InternalScanner {
public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, HRegion region,
InternalScanner delegate, byte[] family) {
- // should there be only one tracker?
this.tracker = tracker;
this.stats = stats;
this.delegate = delegate;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index b8d64bd..eb183e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -72,6 +72,7 @@ public class StatisticsUtil {
s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
ResultScanner scanner = statsHTable.getScanner(s);
Result result = null;
+ long timeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
TreeMap<byte[], GuidePostsInfo> guidePostsPerCf = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR);
while ((result = scanner.next()) != null) {
CellScanner cellScanner = result.cellScanner();
@@ -88,10 +89,13 @@ public class StatisticsUtil {
if (oldInfo != null) {
newInfo.combine(oldInfo);
}
+ if (current.getTimestamp() > timeStamp) {
+ timeStamp = current.getTimestamp();
+ }
}
}
if (!guidePostsPerCf.isEmpty()) {
- return new PTableStatsImpl(guidePostsPerCf);
+ return new PTableStatsImpl(guidePostsPerCf, timeStamp);
}
return PTableStats.EMPTY_STATS;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 6da135e..22f0ead 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -62,7 +62,9 @@ public class StatisticsWriter implements Closeable {
clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
}
StatisticsWriter statsTable = new StatisticsWriter(hTable, tableName, clientTimeStamp);
- statsTable.commitLastStatsUpdatedTime();
+ if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts yet
+ statsTable.commitLastStatsUpdatedTime();
+ }
return statsTable;
}
@@ -101,26 +103,31 @@ public class StatisticsWriter implements Closeable {
*/
public void addStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
if (tracker == null) { return; }
-
+ boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP;
+ long timeStamp = clientTimeStamp;
+ if (useMaxTimeStamp) { // When using max timestamp, we write the update time later because we only know the ts now
+ timeStamp = tracker.getMaxTimeStamp();
+ mutations.add(getLastStatsUpdatedTimePut(timeStamp));
+ }
byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
PDataType.VARCHAR.toBytes(regionName));
Put put = new Put(prefix);
GuidePostsInfo gp = tracker.getGuidePosts(fam);
if (gp != null) {
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_COUNT_BYTES,
- clientTimeStamp, PDataType.LONG.toBytes((gp.getGuidePosts().size())));
+ timeStamp, PDataType.LONG.toBytes((gp.getGuidePosts().size())));
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES,
- clientTimeStamp, PDataType.VARBINARY.toBytes(gp.toBytes()));
+ timeStamp, PDataType.VARBINARY.toBytes(gp.toBytes()));
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES,
- clientTimeStamp, PDataType.LONG.toBytes(gp.getByteCount()));
+ timeStamp, PDataType.LONG.toBytes(gp.getByteCount()));
}
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES,
- clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
+ timeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES,
- clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
+ timeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
// Add our empty column value so queries behave correctly
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
- clientTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
+ timeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
mutations.add(put);
}
@@ -153,21 +160,27 @@ public class StatisticsWriter implements Closeable {
}
}
- private void commitLastStatsUpdatedTime() throws IOException {
- // Always use wallclock time for this, as it's a mechanism to prevent
- // stats from being collected too often.
+ private Put getLastStatsUpdatedTimePut(long timeStamp) {
long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
byte[] prefix = tableName;
Put put = new Put(prefix);
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, clientTimeStamp,
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, timeStamp,
PDataType.DATE.toBytes(new Date(currentTime)));
+ return put;
+ }
+
+ private void commitLastStatsUpdatedTime() throws IOException {
+ // Always use wallclock time for this, as it's a mechanism to prevent
+ // stats from being collected too often.
+ Put put = getLastStatsUpdatedTimePut(clientTimeStamp);
statisticsTable.put(put);
}
public void deleteStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)
throws IOException {
+ long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp() : clientTimeStamp;
byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
PDataType.VARCHAR.toBytes(regionName));
- mutations.add(new Delete(prefix, clientTimeStamp - 1));
+ mutations.add(new Delete(prefix, timeStamp - 1));
}
}
\ No newline at end of file