You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/29 01:02:07 UTC
[12/15] git commit: PHOENIX-1390 Stats not updated on client after
major compaction
PHOENIX-1390 Stats not updated on client after major compaction
Conflicts:
phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
Conflicts:
phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6c94dc6e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6c94dc6e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6c94dc6e
Branch: refs/heads/3.2
Commit: 6c94dc6eb62c1cb4e255d9817af8c985be739785
Parents: 72144f1
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Oct 28 12:44:37 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Oct 28 15:18:39 2014 -0700
----------------------------------------------------------------------
.../end2end/BaseClientManagedTimeIT.java | 15 +-
.../org/apache/phoenix/end2end/BaseQueryIT.java | 3 +-
.../end2end/ClientTimeArithmeticQueryIT.java | 43 ++++++
.../phoenix/end2end/InMemoryOrderByIT.java | 4 +-
.../org/apache/phoenix/end2end/QueryIT.java | 24 ++-
.../org/apache/phoenix/end2end/SequenceIT.java | 7 +-
.../phoenix/end2end/SpooledOrderByIT.java | 4 +-
.../phoenix/end2end/StatsCollectorIT.java | 55 ++++++-
.../apache/phoenix/end2end/UpsertSelectIT.java | 4 +-
.../phoenix/compile/ExpressionCompiler.java | 2 +-
.../coprocessor/MetaDataEndpointImpl.java | 35 +----
.../UngroupedAggregateRegionObserver.java | 21 ++-
.../org/apache/phoenix/query/QueryServices.java | 1 +
.../phoenix/query/QueryServicesOptions.java | 7 +-
.../apache/phoenix/schema/MetaDataClient.java | 4 +-
.../phoenix/schema/stats/PTableStats.java | 7 +
.../phoenix/schema/stats/PTableStatsImpl.java | 12 +-
.../schema/stats/StatisticsCollector.java | 154 +++++++++++--------
.../phoenix/schema/stats/StatisticsScanner.java | 1 -
.../phoenix/schema/stats/StatisticsUtil.java | 6 +-
.../phoenix/schema/stats/StatisticsWriter.java | 39 +++--
21 files changed, 297 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
index 14dffcb..1acd5b3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
@@ -17,16 +17,21 @@
*/
package org.apache.phoenix.end2end;
+import java.util.Map;
+
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
/**
* Base class for tests that manage their own time stamps
* We need to separate these from tests that rely on hbase to set
@@ -54,9 +59,17 @@ public abstract class BaseClientManagedTimeIT extends BaseTest {
deletePriorTables(ts - 1, getUrl());
}
+ public static Map<String,String> getDefaultProps() {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
+ // Must update config before starting server
+ props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, Boolean.FALSE.toString());
+ return props;
+ }
+
@BeforeClass
public static void doSetup() throws Exception {
- setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
+ Map<String,String> props = getDefaultProps();
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
index 25b947d..2b00096 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
@@ -52,7 +52,6 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -75,7 +74,7 @@ public abstract class BaseQueryIT extends BaseClientManagedTimeIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
+ Map<String,String> props = getDefaultProps();
props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
// Make a small batch size to test multiple calls to reserve sequences
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
index 98b233c..d709b9c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
@@ -49,6 +49,7 @@ import java.util.Properties;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -596,5 +597,47 @@ public class ClientTimeArithmeticQueryIT extends BaseQueryIT {
}
}
+ @Test
+ public void testDateDateSubtract() throws Exception {
+ String url;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15);
+ Connection conn = DriverManager.getConnection(url, props);
+ PreparedStatement statement = conn.prepareStatement("UPSERT INTO ATABLE(organization_id,entity_id,a_time) VALUES(?,?,?)");
+ statement.setString(1, getOrganizationId());
+ statement.setString(2, ROW2);
+ statement.setDate(3, date);
+ statement.execute();
+ statement.setString(2, ROW3);
+ statement.setDate(3, date);
+ statement.execute();
+ statement.setString(2, ROW4);
+ statement.setDate(3, new Date(date.getTime() + TestUtil.MILLIS_IN_DAY - 1));
+ statement.execute();
+ statement.setString(2, ROW6);
+ statement.setDate(3, new Date(date.getTime() + TestUtil.MILLIS_IN_DAY - 1));
+ statement.execute();
+ statement.setString(2, ROW9);
+ statement.setDate(3, date);
+ statement.execute();
+ conn.commit();
+ conn.close();
+
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 25);
+ conn = DriverManager.getConnection(url, props);
+ try {
+ statement = conn.prepareStatement("SELECT entity_id, b_string FROM ATABLE WHERE a_date - a_time > 1");
+ ResultSet rs = statement.executeQuery();
+ @SuppressWarnings("unchecked")
+ List<List<Object>> expectedResults = Lists.newArrayList(
+ Arrays.<Object>asList(ROW3, E_VALUE),
+ Arrays.<Object>asList( ROW6, E_VALUE),
+ Arrays.<Object>asList(ROW9, E_VALUE));
+ assertValuesEqualsResultSet(rs, expectedResults);
+ } finally {
+ conn.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
index 48a0581..533143c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
@@ -24,8 +24,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
-import com.google.common.collect.Maps;
-
@Category(ClientManagedTimeTest.class)
public class InMemoryOrderByIT extends OrderByIT {
@@ -35,7 +33,7 @@ public class InMemoryOrderByIT extends OrderByIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ Map<String,String> props = getDefaultProps();
props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1024*1024));
// Must update config before starting server
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
index a537087..36d800a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
@@ -248,7 +248,7 @@ public class QueryIT extends BaseQueryIT {
@Test
public void testPointInTimeScan() throws Exception {
// Override value that was set at creation time
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1); // Run query at timestamp 5
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 10);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection upsertConn = DriverManager.getConnection(url, props);
String upsertStmt =
@@ -265,13 +265,15 @@ public class QueryIT extends BaseQueryIT {
stmt.setString(2, ROW4);
stmt.setInt(3, 5);
stmt.execute(); // should commit too
- Connection conn1 = DriverManager.getConnection(getUrl(), props);
+
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15);
+ Connection conn1 = DriverManager.getConnection(url, props);
analyzeTable(conn1, "ATABLE");
conn1.close();
upsertConn.close();
// Override value again, but should be ignored since it's past the SCN
- url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 3); // Run query at timestamp 5
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 30);
upsertConn = DriverManager.getConnection(url, props);
upsertConn.setAutoCommit(true); // Test auto commit
// Insert all rows at ts
@@ -283,7 +285,7 @@ public class QueryIT extends BaseQueryIT {
upsertConn.close();
String query = "SELECT organization_id, a_string AS a FROM atable WHERE organization_id=? and a_integer = 5";
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2));
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 20));
Connection conn = DriverManager.getConnection(getUrl(), props);
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, tenantId);
@@ -392,7 +394,7 @@ public class QueryIT extends BaseQueryIT {
" A_TIMESTAMP) " +
"VALUES (?, ?, ?)";
// Override value that was set at creation time
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1);
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 10);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection upsertConn = DriverManager.getConnection(url, props);
upsertConn.setAutoCommit(true); // Test auto commit
@@ -403,9 +405,12 @@ public class QueryIT extends BaseQueryIT {
byte[] ts1 = PDataType.TIMESTAMP.toBytes(tsValue1);
stmt.setTimestamp(3, tsValue1);
stmt.execute();
- Connection conn1 = DriverManager.getConnection(getUrl(), props);
+
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15);
+ Connection conn1 = DriverManager.getConnection(url, props);
analyzeTable(conn1, "ATABLE");
conn1.close();
+
updateStmt =
"upsert into " +
"ATABLE(" +
@@ -424,15 +429,18 @@ public class QueryIT extends BaseQueryIT {
stmt.setTime(4, new Time(tsValue2.getTime()));
stmt.execute();
upsertConn.close();
- conn1 = DriverManager.getConnection(getUrl(), props);
+
+ url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 20);
+ conn1 = DriverManager.getConnection(url, props);
analyzeTable(conn1, "ATABLE");
conn1.close();
+
analyzeTable(upsertConn, "ATABLE");
assertTrue(compare(CompareOp.GREATER, new ImmutableBytesWritable(ts2), new ImmutableBytesWritable(ts1)));
assertFalse(compare(CompareOp.GREATER, new ImmutableBytesWritable(ts1), new ImmutableBytesWritable(ts1)));
String query = "SELECT entity_id, a_timestamp, a_time FROM aTable WHERE organization_id=? and a_timestamp > ?";
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 3)); // Execute at timestamp 2
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30)); // Execute at timestamp 2
Connection conn = DriverManager.getConnection(getUrl(), props);
try {
PreparedStatement statement = conn.prepareStatement(query);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
index d610633..acaa1bb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
@@ -51,7 +51,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
@Category(ClientManagedTimeTest.class)
public class SequenceIT extends BaseClientManagedTimeIT {
@@ -63,11 +62,9 @@ public class SequenceIT extends BaseClientManagedTimeIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
-
- Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
- // Make a small batch size to test multiple calls to reserve sequences
- props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE));
+ Map<String,String> props = getDefaultProps();
// Must update config before starting server
+ props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
index 2533a29..c35ecab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
@@ -24,15 +24,13 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
-import com.google.common.collect.Maps;
-
@Category(ClientManagedTimeTest.class)
public class SpooledOrderByIT extends OrderByIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ Map<String,String> props = getDefaultProps();
props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(100));
// Must update config before starting server
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index 51ad543..b9a0e88 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.end2end;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getAllSplits;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -27,9 +29,15 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -41,7 +49,8 @@ import com.google.common.collect.Maps;
@Category(NeedsOwnMiniClusterTest.class)
public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
-
+ private static final String STATS_TEST_TABLE_NAME = "S";
+
@BeforeClass
public static void doSetup() throws Exception {
Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
@@ -222,4 +231,48 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
return stmt;
}
+ private void compactTable(Connection conn) throws IOException, InterruptedException, SQLException {
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ HBaseAdmin admin = services.getAdmin();
+ try {
+ admin.flush(STATS_TEST_TABLE_NAME);
+ admin.majorCompact(STATS_TEST_TABLE_NAME);
+ Thread.sleep(10000); // FIXME: how do we know when compaction is done?
+ } finally {
+ admin.close();
+ }
+ services.clearCache();
+ }
+
+ @Test
+ public void testCompactUpdatesStats() throws Exception {
+ int nRows = 10;
+ Connection conn;
+ PreparedStatement stmt;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ conn = DriverManager.getConnection(getUrl(), props);
+ conn.createStatement().execute("CREATE TABLE " + STATS_TEST_TABLE_NAME + "(k CHAR(1) PRIMARY KEY, v INTEGER) " + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE);
+ stmt = conn.prepareStatement("UPSERT INTO " + STATS_TEST_TABLE_NAME + " VALUES(?,?)");
+ for (int i = 0; i < nRows; i++) {
+ stmt.setString(1, Character.toString((char) ('a' + i)));
+ stmt.setInt(2, i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+
+ compactTable(conn);
+ conn = DriverManager.getConnection(getUrl(), props);
+ List<KeyRange>keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME);
+ assertEquals(nRows+1, keyRanges.size());
+
+ int nDeletedRows = conn.createStatement().executeUpdate("DELETE FROM " + STATS_TEST_TABLE_NAME + " WHERE V < 5");
+ conn.commit();
+ assertEquals(5, nDeletedRows);
+
+ compactTable(conn);
+
+ keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME);
+ assertEquals(nRows/2+1, keyRanges.size());
+
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 642ba62..ac54fe4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -55,8 +55,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import com.google.common.collect.Maps;
-
@Category(ClientManagedTimeTest.class)
public class UpsertSelectIT extends BaseClientManagedTimeIT {
@@ -64,7 +62,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
+ Map<String,String> props = getDefaultProps();
props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(500));
props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
index 135ef01..409950c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
@@ -871,7 +871,7 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio
if (isType1Date || isType2Date) {
if (isType1Date && isType2Date) {
i = 2;
- theType = PDataType.LONG;
+ theType = PDataType.DECIMAL;
} else if (isType1Date && type2 != null
&& type2.isCoercibleTo(PDataType.DECIMAL)) {
i = 2;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index f7c0aae..38277c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -465,6 +465,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
try {
statsHTable = getEnvironment().getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp);
+ timeStamp = Math.max(timeStamp, stats.getTimestamp());
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
logger.warn(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " not online yet?");
} finally {
@@ -1119,33 +1120,6 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
return null; // impossible
}
}
-
- private PTable incrementTableTimestamp(byte[] key, long clientTimeStamp) throws IOException, SQLException {
- RegionCoprocessorEnvironment env = getEnvironment();
- HRegion region = env.getRegion();
- Integer lid = region.getLock(null, key, true);
- if (lid == null) {
- throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
- }
- try {
- PTable table = doGetTable(key, clientTimeStamp, lid);
- if (table != null) {
- long tableTimeStamp = table.getTimeStamp() + 1;
- List<Mutation> mutations = Lists.newArrayListWithExpectedSize(1);
- Put p = new Put(key);
- p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, tableTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
- mutations.add(p);
- region.mutateRowsWithLocks(mutations, Collections.<byte[]> emptySet());
-
- Cache<ImmutableBytesPtr, PTable> metaDataCache = GlobalCache.getInstance(getEnvironment()).getMetaDataCache();
- ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
- metaDataCache.invalidate(cacheKey);
- }
- return table;
- } finally {
- region.releaseRowLock(lid);
- }
- }
private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
return doGetTable(key, clientTimeStamp, null);
@@ -1413,8 +1387,11 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
public void incrementTableTimeStamp(byte[] tenantId, byte[] schemaName, byte[] tableName, final long clientTimeStamp)
throws IOException {
try {
- byte[] tableKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
- incrementTableTimestamp(tableKey, clientTimeStamp);
+ byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+ ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+ Cache<ImmutableBytesPtr, PTable> metaDataCache =
+ GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+ metaDataCache.invalidate(cacheKey);
} catch (Throwable t) {
ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 556d69d..e4215e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -68,6 +68,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ConstraintViolationException;
import org.apache.phoenix.schema.PColumn;
@@ -433,9 +434,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
if (!table.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
&& scanType.equals(ScanType.MAJOR_COMPACT)) {
try {
- // TODO: for users that manage timestamps themselves, we should provide
- // a means of specifying/getting this.
- long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
+ boolean useCurrentTime =
+ c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+ // Provides a means of clients controlling their timestamps to not use current time
+ // when background tasks are updating stats. Instead we track the max timestamp of
+ // the cells and use that.
+ long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP;
StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table, clientTimeStamp);
internalScan =
stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s);
@@ -459,9 +464,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
if (!table.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
StatisticsCollector stats = null;
try {
- // TODO: for users that manage timestamps themselves, we should provide
- // a means of specifying/getting this.
- long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
+ boolean useCurrentTime =
+ e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+ // Provides a means of clients controlling their timestamps to not use current time
+ // when background tasks are updating stats. Instead we track the max timestamp of
+ // the cells and use that.
+ long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP;
stats = new StatisticsCollector(e.getEnvironment(), table, clientTimeStamp);
stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region);
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index a8536a4..d3faf2e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -124,6 +124,7 @@ public interface QueryServices extends SQLCloseable {
public static final String MIN_STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.stats.minUpdateFrequency";
public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width";
public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region";
+ public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime";
public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets";
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index ad2b48a..117f285 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -52,6 +52,7 @@ import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTRIB;
import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
@@ -131,8 +132,9 @@ public class QueryServicesOptions {
public static final long DEFAULT_STATS_HISTOGRAM_DEPTH_BYTE = 1024 * 1024 * 30;
public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min
- public static final int DEFAULT_GUIDE_POSTS_PER_REGION = 20;
-
+ public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 100 * 1024 *1024; // 100MB
+ public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true;
+
public static final boolean DEFAULT_USE_REVERSE_SCAN = true;
/**
@@ -160,6 +162,7 @@ public class QueryServicesOptions {
public static QueryServicesOptions withDefaults() {
Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
QueryServicesOptions options = new QueryServicesOptions(config)
+ .setIfUnset(STATS_USE_CURRENT_TIME_ATTRIB, DEFAULT_STATS_USE_CURRENT_TIME)
.setIfUnset(KEEP_ALIVE_MS_ATTRIB, DEFAULT_KEEP_ALIVE_MS)
.setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE)
.setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 0efbad6..7421f86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -594,13 +594,13 @@ public class MetaDataClient {
Long scn = connection.getSCN();
// Always invalidate the cache
long clientTimeStamp = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn;
- String query = "SELECT CURRENT_DATE() - " + LAST_STATS_UPDATE_TIME + " FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME
+ String query = "SELECT CURRENT_DATE()," + LAST_STATS_UPDATE_TIME + " FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME
+ " WHERE " + PHYSICAL_NAME + "='" + physicalName.getString() + "' AND " + COLUMN_FAMILY
+ " IS NULL AND " + REGION_NAME + " IS NULL AND " + LAST_STATS_UPDATE_TIME + " IS NOT NULL";
ResultSet rs = connection.createStatement().executeQuery(query);
long msSinceLastUpdate = Long.MAX_VALUE;
if (rs.next()) {
- msSinceLastUpdate = rs.getLong(1);
+ msSinceLastUpdate = rs.getLong(1) - rs.getLong(2);
}
if (msSinceLastUpdate < msMinBetweenUpdates) {
return 0;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
index 2c26739..ce6a9fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
@@ -57,6 +57,11 @@ public interface PTableStats extends Writable {
public int getEstimatedSize() {
return 0;
}
+
+ @Override
+ public long getTimestamp() {
+ return StatisticsCollector.NO_TIMESTAMP;
+ }
};
/**
@@ -67,4 +72,6 @@ public interface PTableStats extends Writable {
SortedMap<byte[], GuidePostsInfo> getGuidePosts();
int getEstimatedSize();
+
+ long getTimestamp();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
index fab9f52..8531979 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
@@ -28,6 +28,7 @@ import java.util.TreeMap;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.util.SizedUtil;
import com.google.common.collect.Lists;
@@ -40,13 +41,15 @@ import com.sun.istack.NotNull;
public class PTableStatsImpl implements PTableStats {
private final SortedMap<byte[], GuidePostsInfo> guidePosts;
private final int estimatedSize;
+ private final long timeStamp;
public PTableStatsImpl() {
- this(new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR));
+ this(new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR), MetaDataProtocol.MIN_TABLE_TIMESTAMP);
}
- public PTableStatsImpl(@NotNull SortedMap<byte[], GuidePostsInfo> guidePosts) {
+ public PTableStatsImpl(@NotNull SortedMap<byte[], GuidePostsInfo> guidePosts, long timeStamp) {
this.guidePosts = guidePosts;
+ this.timeStamp = timeStamp;
int estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.INT_SIZE + SizedUtil.sizeOfTreeMap(guidePosts.size());
for (Map.Entry<byte[], GuidePostsInfo> entry : guidePosts.entrySet()) {
byte[] cf = entry.getKey();
@@ -119,4 +122,9 @@ public class PTableStatsImpl implements PTableStats {
public int getEstimatedSize() {
return estimatedSize;
}
+
+ @Override
+ public long getTimestamp() {
+ return timeStamp;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 2e7bfd9..3bdb9a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -14,8 +14,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@@ -33,12 +31,15 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.TimeKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -52,35 +53,48 @@ import com.google.common.collect.Maps;
* board for now.
*/
public class StatisticsCollector {
+ private static final Logger logger = LoggerFactory.getLogger(StatisticsCollector.class);
+ public static final long NO_TIMESTAMP = -1;
+
+
private Map<String, byte[]> minMap = Maps.newHashMap();
private Map<String, byte[]> maxMap = Maps.newHashMap();
private long guidepostDepth;
+ private boolean useCurrentTime;
+ private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
private Map<String, Pair<Long,GuidePostsInfo>> guidePostsMap = Maps.newHashMap();
// Tracks the bytecount per family if it has reached the guidePostsDepth
private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
protected StatisticsWriter statsTable;
- // Ensures that either analyze or compaction happens at any point of time.
- private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp)
throws IOException {
Configuration config = env.getConfiguration();
HTableInterface statsHTable = env.getTable((PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
- long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize();
- if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set...
- maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
+ useCurrentTime =
+ config.getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+ int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 0);
+ if (guidepostPerRegion > 0) {
+ long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize();
+ if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set...
+ maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
+ }
+ guidepostDepth = maxFileSize / guidepostPerRegion;
+ } else {
+ guidepostDepth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
}
- guidepostDepth = config.getLong(
- QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
- maxFileSize
- / config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
- QueryServicesOptions.DEFAULT_GUIDE_POSTS_PER_REGION));
// Get the stats table associated with the current table on which the CP is
// triggered
this.statsTable = StatisticsWriter.newWriter(statsHTable, tableName, clientTimeStamp);
}
+ public long getMaxTimeStamp() {
+ return maxTimeStamp;
+ }
+
public void close() throws IOException {
this.statsTable.close();
}
@@ -89,12 +103,12 @@ public class StatisticsCollector {
try {
ArrayList<Mutation> mutations = new ArrayList<Mutation>();
writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing new stats for the region " + region.getRegionInfo());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Committing new stats for the region " + region.getRegionInfo());
}
commitStats(mutations);
} catch (IOException e) {
- LOG.error(e);
+ logger.error("Unable to commit new stats", e);
} finally {
clear();
}
@@ -106,20 +120,20 @@ public class StatisticsCollector {
// update the statistics table
for (ImmutableBytesPtr fam : familyMap.keySet()) {
if (delete) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Deleting the stats for the region " + region.getRegionInfo());
+ if(logger.isDebugEnabled()) {
+ logger.debug("Deleting the stats for the region "+region.getRegionInfo());
}
statsTable.deleteStats(region.getRegionInfo().getRegionNameAsString(), this, Bytes.toString(fam.copyBytesIfNecessary()),
mutations);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding new stats for the region " + region.getRegionInfo());
+ if(logger.isDebugEnabled()) {
+ logger.debug("Adding new stats for the region "+region.getRegionInfo());
}
statsTable.addStats((region.getRegionInfo().getRegionNameAsString()), this, Bytes.toString(fam.copyBytesIfNecessary()),
mutations);
}
} catch (IOException e) {
- LOG.error("Failed to update statistics table!", e);
+ logger.error("Failed to update statistics table!", e);
throw e;
}
}
@@ -138,7 +152,7 @@ public class StatisticsCollector {
mutations);
}
} catch (IOException e) {
- LOG.error("Failed to delete from statistics table!", e);
+ logger.error("Failed to delete from statistics table!", e);
throw e;
}
}
@@ -164,29 +178,58 @@ public class StatisticsCollector {
}
}
- public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r, HRegion parent)
+ public InternalScanner createCompactionScanner(HRegion region, Store store,
+ List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s)
+ throws IOException {
+ // See if this is for Major compaction
+ InternalScanner internalScan = s;
+ if (scanType.equals(ScanType.MAJOR_COMPACT)) {
+ // this is the first CP accessed, so we need to just create a major
+ // compaction scanner, just
+ // like in the compactor
+ if (s == null) {
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getFamily().getMaxVersions());
+ long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
+ internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
+ smallestReadPoint, earliestPutTs);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Compaction scanner created for stats");
+ }
+ InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName());
+ if (scanner != null) {
+ internalScan = scanner;
+ }
+ }
+ return internalScan;
+ }
+
+
+ public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r, HRegion region)
throws IOException {
// Invoke collectStat here
try {
// Create a delete operation on the parent region
// Then write the new guide posts for individual regions
List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
- long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
- deleteStatsFromStatsTable(parent, mutations, currentTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
+
+ long currentTime = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : -1;
+ deleteStatsFromStatsTable(region, mutations, currentTime);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Collecting stats for the daughter region " + l.getRegionInfo());
}
collectStatsForSplitRegions(conf, l, mutations, currentTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Collecting stats for the daughter region " + r.getRegionInfo());
}
collectStatsForSplitRegions(conf, r, mutations, currentTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
}
} catch (IOException e) {
- LOG.error("Error while capturing stats after split of region "
- + parent.getRegionInfo().getRegionNameAsString(), e);
+ logger.error("Error while capturing stats after split of region "
+ + region.getRegionInfo().getRegionNameAsString(), e);
}
}
@@ -202,47 +245,20 @@ public class StatisticsCollector {
count = scanRegion(scanner, count);
writeStatsToStatsTable(daughter, false, mutations, currentTime);
} catch (IOException e) {
- LOG.error(e);
+ logger.error("Unable to collects stats during split", e);
toThrow = e;
} finally {
- try {
- if (scanner != null) scanner.close();
- } catch (IOException e) {
- LOG.error(e);
- if (toThrow != null) toThrow = e;
- } finally {
- if (toThrow != null) throw toThrow;
- }
- }
- }
-
- public InternalScanner createCompactionScanner(HRegion region, Store store,
- List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s)
- throws IOException {
- // See if this is for Major compaction
- InternalScanner internalScan = s;
- if (scanType.equals(ScanType.MAJOR_COMPACT)) {
- // this is the first CP accessed, so we need to just create a major
- // compaction scanner, just
- // like in the compactor
- if (s == null) {
- Scan scan = new Scan();
- scan.setMaxVersions(store.getFamily().getMaxVersions());
- long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
- internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
- smallestReadPoint, earliestPutTs);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compaction scanner created for stats");
- }
- InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName());
- if (scanner != null) {
- internalScan = scanner;
+ try {
+ if (scanner != null) scanner.close();
+ } catch (IOException e) {
+ logger.error("Unable to close scanner after split", e);
+ if (toThrow != null) toThrow = e;
+ } finally {
+ if (toThrow != null) throw toThrow;
}
}
- return internalScan;
}
-
+
private Scan createScan(Configuration conf) {
Scan scan = new Scan();
scan.setCaching(conf.getInt(QueryServices.SCAN_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE));
@@ -262,6 +278,7 @@ public class StatisticsCollector {
this.minMap.clear();
this.guidePostsMap.clear();
this.familyMap.clear();
+ maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
}
public void updateStatistic(KeyValue kv) {
@@ -285,6 +302,7 @@ public class StatisticsCollector {
maxMap.put(fam, row);
}
}
+ maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp());
// TODO : This can be moved to an interface so that we could collect guide posts in different ways
Pair<Long,GuidePostsInfo> gps = guidePostsMap.get(fam);
if (gps == null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 4e28123..239085d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -33,7 +33,6 @@ public class StatisticsScanner implements InternalScanner {
public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, HRegion region,
InternalScanner delegate, byte[] family) {
- // should there be only one tracker?
this.tracker = tracker;
this.stats = stats;
this.delegate = delegate;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index 8d7dd00..a48b04a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -65,6 +65,7 @@ public class StatisticsUtil {
ResultScanner scanner = statsHTable.getScanner(s);
try {
Result result = null;
+ long timeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
TreeMap<byte[], GuidePostsInfo> guidePostsPerCf = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR);
while ((result = scanner.next()) != null) {
KeyValue current = result.raw()[0];
@@ -78,9 +79,12 @@ public class StatisticsUtil {
if (oldInfo != null) {
newInfo.combine(oldInfo);
}
+ if (current.getTimestamp() > timeStamp) {
+ timeStamp = current.getTimestamp();
+ }
}
if (!guidePostsPerCf.isEmpty()) {
- return new PTableStatsImpl(guidePostsPerCf);
+ return new PTableStatsImpl(guidePostsPerCf, timeStamp);
}
} finally {
scanner.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c94dc6e/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 3f391f5..4118bb9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -44,7 +44,9 @@ public class StatisticsWriter implements Closeable {
clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
}
StatisticsWriter statsTable = new StatisticsWriter(hTable, tableName, clientTimeStamp);
- statsTable.commitLastStatsUpdatedTime();
+ if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts yet
+ statsTable.commitLastStatsUpdatedTime();
+ }
return statsTable;
}
@@ -83,26 +85,31 @@ public class StatisticsWriter implements Closeable {
*/
public void addStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
if (tracker == null) { return; }
-
+ boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP;
+ long timeStamp = clientTimeStamp;
+ if (useMaxTimeStamp) { // When using max timestamp, we write the update time later because we only know the ts now
+ timeStamp = tracker.getMaxTimeStamp();
+ mutations.add(getLastStatsUpdatedTimePut(timeStamp));
+ }
byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
PDataType.VARCHAR.toBytes(regionName));
Put put = new Put(prefix);
GuidePostsInfo gp = tracker.getGuidePosts(fam);
if (gp != null) {
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_COUNT_BYTES,
- clientTimeStamp, PDataType.LONG.toBytes((gp.getGuidePosts().size())));
+ timeStamp, PDataType.LONG.toBytes((gp.getGuidePosts().size())));
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES,
- clientTimeStamp, PDataType.VARBINARY.toBytes(gp.toBytes()));
+ timeStamp, PDataType.VARBINARY.toBytes(gp.toBytes()));
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES,
- clientTimeStamp, PDataType.LONG.toBytes(gp.getByteCount()));
+ timeStamp, PDataType.LONG.toBytes(gp.getByteCount()));
}
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES,
- clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
+ timeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES,
- clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
+ timeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
// Add our empty column value so queries behave correctly
put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
- clientTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
+ timeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
mutations.add(put);
}
@@ -115,21 +122,27 @@ public class StatisticsWriter implements Closeable {
}
}
- private void commitLastStatsUpdatedTime() throws IOException {
- // Always use wallclock time for this, as it's a mechanism to prevent
- // stats from being collected too often.
+ private Put getLastStatsUpdatedTimePut(long timeStamp) {
long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
byte[] prefix = tableName;
Put put = new Put(prefix);
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, clientTimeStamp,
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, timeStamp,
PDataType.DATE.toBytes(new Date(currentTime)));
+ return put;
+ }
+
+ private void commitLastStatsUpdatedTime() throws IOException {
+ // Always use wallclock time for this, as it's a mechanism to prevent
+ // stats from being collected too often.
+ Put put = getLastStatsUpdatedTimePut(clientTimeStamp);
statisticsTable.put(put);
}
public void deleteStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)
throws IOException {
+ long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp() : clientTimeStamp;
byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
PDataType.VARCHAR.toBytes(regionName));
- mutations.add(new Delete(prefix, clientTimeStamp - 1));
+ mutations.add(new Delete(prefix, timeStamp - 1));
}
}
\ No newline at end of file