You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2020/12/16 02:06:37 UTC
[phoenix] branch 4.x updated: PHOENIX-5592 MapReduce job to
asynchronously delete rows where the VIEW_TTL has expired
This is an automated email from the ASF dual-hosted git repository.
yanxinyi pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 5877936 PHOENIX-5592 MapReduce job to asynchronously delete rows where the VIEW_TTL has expired
5877936 is described below
commit 5877936257e0356f47eedd25346234230822e115
Author: Xinyi Yan <xy...@salesforce.com>
AuthorDate: Wed Nov 18 21:49:47 2020 -0800
PHOENIX-5592 MapReduce job to asynchronously delete rows where the VIEW_TTL has expired
Signed-off-by: Xinyi Yan <ya...@apache.org>
---
.../DefaultPhoenixMultiViewListProviderIT.java | 205 +++++
.../apache/phoenix/end2end/PhoenixTTLToolIT.java | 848 +++++++++++++++++++++
.../mapreduce/PhoenixMultiViewInputFormat.java | 113 +++
.../mapreduce/PhoenixMultiViewInputSplit.java | 77 ++
.../phoenix/mapreduce/PhoenixMultiViewReader.java | 86 +++
.../mapreduce/PhoenixTTLDeleteJobMapper.java | 241 ++++++
.../apache/phoenix/mapreduce/PhoenixTTLTool.java | 320 ++++++++
.../util/DefaultMultiViewJobStatusTracker.java | 50 ++
.../util/DefaultMultiViewSplitStrategy.java | 79 ++
.../util/DefaultPhoenixMultiViewListProvider.java | 200 +++++
.../mapreduce/util/MultiViewJobStatusTracker.java | 25 +
.../mapreduce/util/MultiViewSplitStrategy.java | 27 +
.../mapreduce/util/PhoenixConfigurationUtil.java | 55 +-
.../mapreduce/util/PhoenixMapReduceUtil.java | 17 +
.../mapreduce/util/PhoenixMultiInputUtil.java | 114 +++
.../util/PhoenixMultiViewListProvider.java | 25 +
.../phoenix/mapreduce/util/ViewInfoTracker.java | 108 +++
.../phoenix/mapreduce/util/ViewInfoWritable.java | 52 ++
.../org/apache/phoenix/query/QueryServices.java | 1 -
.../DefaultMultiViewSplitStrategyTest.java | 110 +++
.../mapreduce/PhoenixMultiViewInputFormatTest.java | 86 +++
.../mapreduce/PhoenixMultiViewReaderTest.java | 86 +++
.../phoenix/mapreduce/PhoenixTTLToolTest.java | 81 ++
23 files changed, 2999 insertions(+), 7 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java
new file mode 100644
index 0000000..94b8b56
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.DefaultPhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.PhoenixTTLTool.DELETE_ALL_VIEWS;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE;
+import static org.junit.Assert.assertEquals;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class DefaultPhoenixMultiViewListProviderIT extends ParallelStatsDisabledIT {
+ private final String BASE_TABLE_DDL = "CREATE TABLE %s (TENANT_ID CHAR(10) NOT NULL, " +
+ "ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+ "PK PRIMARY KEY (TENANT_ID,ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+ private final String VIEW_DDL = "CREATE VIEW %s (" +
+ "PK1 BIGINT PRIMARY KEY,A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+ " AS SELECT * FROM %s ";
+ private final String VIEW_DDL_WITH_ID_PREFIX_AND_TTL =
+ VIEW_DDL + " PHOENIX_TTL = 1000";
+ private final String VIEW_INDEX_DDL = "CREATE INDEX %s ON %s(%s)";
+ private final String TENANT_VIEW_DDL =
+ "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s";
+ private final String TENANT_VIEW_DDL_WITH_TTL = TENANT_VIEW_DDL + " PHOENIX_TTL = 1000";;
+
+ @Test
+ public void testGetPhoenixMultiViewList() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String globalViewName1 = schema + "." + generateUniqueName();
+ String globalViewName2 = schema + "." + generateUniqueName();
+ String tenantViewName1 = schema + "." + generateUniqueName();
+ String tenantViewName2 = schema + "." + generateUniqueName();
+ String tenantViewName3 = schema + "." + generateUniqueName();
+ String tenantViewName4 = schema + "." + generateUniqueName();
+ String indexTable1 = generateUniqueName() + "_IDX";
+ String indexTable2 = generateUniqueName() + "_IDX";
+ String indexTable3 = generateUniqueName() + "_IDX";
+ String indexTable4 = generateUniqueName() + "_IDX";
+ String tenant1 = generateUniqueName();
+ String tenant2 = generateUniqueName();
+ DefaultPhoenixMultiViewListProvider defaultPhoenixMultiViewListProvider =
+ new DefaultPhoenixMultiViewListProvider();
+ Configuration cloneConfig = PropertiesUtil.cloneConfig(config);
+
+ cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS,
+ DELETE_ALL_VIEWS);
+ cloneConfig.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE,"2");
+ List<ViewInfoWritable> result =
+ defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+
+ /*
+ Case 1 : no view
+ */
+ assertEquals(0, result.size());
+
+ /*
+ Case 2 :
+ BaseMultiTenantTable
+ GlobalView1 with TTL(1 ms)
+ Index1 Index2
+ TenantView1, TenantView2
+ */
+ try (Connection globalConn = DriverManager.getConnection(url);
+ Connection tenant1Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(url, tenant1);
+ Connection tenant2Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(url, tenant2)) {
+
+ globalConn.createStatement().execute(String.format(BASE_TABLE_DDL, baseTableFullName));
+ globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
+ globalViewName1, baseTableFullName));
+
+ globalConn.createStatement().execute(
+ String.format(VIEW_INDEX_DDL, indexTable1, globalViewName1, "A,B"));
+ globalConn.createStatement().execute(
+ String.format(VIEW_INDEX_DDL, indexTable2, globalViewName1, "C,D"));
+
+ tenant1Connection.createStatement().execute(
+ String.format(TENANT_VIEW_DDL,tenantViewName1, globalViewName1));
+ tenant2Connection.createStatement().execute(
+ String.format(TENANT_VIEW_DDL,tenantViewName2, globalViewName1));
+ }
+
+ result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+ // view with 2 index views is issuing 3 deletion jobs
+ // 1 from the data table, 2 from the index table.
+ assertEquals(3, result.size());
+
+ /*
+ Case 3: globalView2 without TTL
+ BaseMultiTenantTable
+ GlobalView1 with TTL(1 ms) GlobalView2 without TTL
+ Index1 Index2 Index3 Index4
+ TenantView1, TenantView2
+ */
+ try (Connection globalConn = DriverManager.getConnection(url)) {
+ globalConn.createStatement().execute(String.format(VIEW_DDL,
+ globalViewName2, baseTableFullName));
+ globalConn.createStatement().execute(
+ String.format(VIEW_INDEX_DDL, indexTable3, globalViewName2, "A,B"));
+ globalConn.createStatement().execute(
+ String.format(VIEW_INDEX_DDL, indexTable4, globalViewName2, "C,D"));
+ }
+ result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+ assertEquals(3, result.size());
+
+ /*
+ Case 4: adding tenant3 and tenant4 with TTL
+ BaseMultiTenantTable
+ GlobalView1 with TTL(1 ms) GlobalView2 without TTL
+ Index1 Index2 Index3 Index4
+ TenantView1, TenantView2 TenantView3 with TTL, TenantView4 without TTL
+ */
+
+ try (Connection tenant1Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(url, tenant1);
+ Connection tenant2Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(url, tenant2)) {
+ tenant1Connection.createStatement().execute(
+ String.format(TENANT_VIEW_DDL_WITH_TTL,tenantViewName3, globalViewName2));
+ tenant2Connection.createStatement().execute(
+ String.format(TENANT_VIEW_DDL,tenantViewName4, globalViewName2));
+ }
+ result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+ assertEquals(6, result.size());
+
+ /*
+ Testing tenant specific case. Even tenant1 created 2 leaf views, one of them was created
+ under a global view with TTL. This will not add to the deletion list.
+ */
+ cloneConfig = PropertiesUtil.cloneConfig(config);
+ cloneConfig.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE,"2");
+ cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant1);
+ result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+ assertEquals(3, result.size());
+
+ /*
+ Deleting tenant1 with tenantViewName1 will NOT add any deletion job to the list because
+ the parent global view has TTL value.
+ */
+ cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant1);
+ cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW,
+ tenantViewName1);
+ result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+ assertEquals(0, result.size());
+
+ /*
+ Without tenant id, it will not add the job to the list even the tenant view name is
+ provided.
+ */
+ cloneConfig = PropertiesUtil.cloneConfig(config);
+ cloneConfig.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE,"2");
+ cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW,
+ tenantViewName3);
+ result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+ assertEquals(0, result.size());
+
+ /*
+ tenant id + tenant view name will add 3 job to the list.
+ 1 for data table and 2 for the index table.
+ */
+ cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant1);
+ cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW,
+ tenantViewName3);
+ result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+ assertEquals(3, result.size());
+
+ /*
+ tenant id + tenant view name will NOT add ot the list because tenantViewName4 did NOT
+ have TTL value.
+ */
+ cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant2);
+ cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW,
+ tenantViewName4);
+ result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+ assertEquals(0, result.size());
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
new file mode 100644
index 0000000..d1d101f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
@@ -0,0 +1,848 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.PhoenixTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
+
+ private final long PHOENIX_TTL_EXPIRE_IN_A_SECOND = 1;
+ private final long MILLISECOND = 1000;
+ private final long PHOENIX_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+ private final String VIEW_PREFIX1 = "V01";
+ private final String VIEW_PREFIX2 = "V02";
+ private final String UPSERT_TO_GLOBAL_VIEW_QUERY =
+ "UPSERT INTO %s (PK1,A,B,C,D) VALUES(1,1,1,1,1)";
+ private final String UPSERT_TO_LEAF_VIEW_QUERY =
+ "UPSERT INTO %s (PK1,A,B,C,D,E,F) VALUES(1,1,1,1,1,1,1)";
+ private final String VIEW_DDL_WITH_ID_PREFIX_AND_TTL = "CREATE VIEW %s (" +
+ "PK1 BIGINT PRIMARY KEY,A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+ " AS SELECT * FROM %s WHERE ID = '%s' PHOENIX_TTL = %d";
+ private final String VIEW_INDEX_DDL = "CREATE INDEX %s ON %s(%s)";
+ private final String TENANT_VIEW_DDL =
+ "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s";
+
+ private void verifyNumberOfRowsFromHBaseLevel(String tableName, String regrex, int expectedRows)
+ throws Exception {
+ try (Table table = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES)
+ .getTable(SchemaUtil.getTableNameAsBytes(
+ SchemaUtil.getSchemaNameFromFullName(tableName),
+ SchemaUtil.getTableNameFromFullName(tableName)))) {
+ Filter filter =
+ new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+ Scan scan = new Scan();
+ scan.setFilter(filter);
+ assertEquals(expectedRows, getRowCount(table,scan));
+ }
+ }
+
+ private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+ Connection conn) throws Exception {
+ String query = "SELECT COUNT(*) FROM " + tableName;
+ if (tenantId != null) {
+ query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+ }
+ try (Statement stm = conn.createStatement()) {
+
+ ResultSet rs = stm.executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(expectedRows, rs.getInt(1));
+ }
+ }
+
+ private long getRowCount(Table table, Scan scan) throws Exception {
+ ResultScanner scanner = table.getScanner(scan);
+ int numMatchingRows = 0;
+ for (Result result = scanner.next(); result != null; result = scanner.next()) {
+ numMatchingRows++;
+ }
+
+ scanner.close();
+ return numMatchingRows;
+ }
+ private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+ String ddl = "CREATE TABLE " + tableName +
+ " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+ "PK PRIMARY KEY (TENANT_ID,ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(ddl);
+ }
+ }
+
+ /*
+ BaseMultiTenantTable
+ GlobalView1 with TTL(1 ms)
+ Index1 Index2
+
+ Creating 2 tenantViews and Upserting data.
+ After running the MR job, it should delete all data.
+ */
+ @Test
+ public void testTenantViewOnGlobalViewWithMoreThanOneIndex() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String indexTable1 = generateUniqueName() + "_IDX";
+ String indexTable2 = generateUniqueName() + "_IDX";
+ String globalViewName = schema + "." + generateUniqueName();
+ String tenant1 = generateUniqueName();
+ String tenant2 = generateUniqueName();
+ String tenantView1 = schema + "." + generateUniqueName();
+ String tenantView2 = schema + "." + generateUniqueName();
+ String indexTable = "_IDX_" + baseTableFullName;
+
+ try (Connection globalConn = DriverManager.getConnection(getUrl());
+ Connection tenant1Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+ Connection tenant2Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+ createMultiTenantTable(globalConn, baseTableFullName);
+ globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
+ globalViewName, baseTableFullName, VIEW_PREFIX1,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+
+ globalConn.createStatement().execute(
+ String.format(VIEW_INDEX_DDL, indexTable1, globalViewName, "A,B"));
+ globalConn.createStatement().execute(
+ String.format(VIEW_INDEX_DDL, indexTable2, globalViewName, "C,D"));
+
+ tenant1Connection.setAutoCommit(true);
+ tenant2Connection.setAutoCommit(true);
+
+ tenant1Connection.createStatement().execute(
+ String.format(TENANT_VIEW_DDL,tenantView1, globalViewName));
+ tenant2Connection.createStatement().execute(
+ String.format(TENANT_VIEW_DDL,tenantView2, globalViewName));
+
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantView1));
+ verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantView2));
+
+ // wait the row to be expired and index to be updated
+ Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+ verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
+
+ // the view has 2 view indexes, so upsert 1 row(base table) will result
+ // 2 rows(index table)
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2);
+
+ // running MR job to delete expired rows.
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+ assertEquals(0, status);
+
+ verifyNumberOfRows(baseTableFullName, tenant1, 0, globalConn);
+ verifyNumberOfRows(baseTableFullName, tenant2, 0, globalConn);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 0);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 0);
+ }
+ }
+
+ /*
+ BaseMultiTenantTable
+ GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY)
+ Index1 Index2 Index3 Index4
+
+ Upserting data to both global views and run the MR job.
+ It should only delete GlobalView1 data not remove GlobalView2 data.
+ */
+ @Test
+ public void testGlobalViewWithMoreThanOneIndex() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String globalViewName1 = schema + "." + generateUniqueName();
+ String globalViewName2 = schema + "." + generateUniqueName();
+ String indexTable1 = generateUniqueName() + "_IDX";
+ String indexTable2 = generateUniqueName() + "_IDX";
+ String indexTable3 = generateUniqueName() + "_IDX";
+ String indexTable4 = generateUniqueName() + "_IDX";
+ String indexTable = "_IDX_" + baseTableFullName;
+ String tenant1 = generateUniqueName();
+ String tenant2 = generateUniqueName();
+
+ try (Connection globalConn = DriverManager.getConnection(getUrl());
+ Connection tenant1Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+ Connection tenant2Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+ createMultiTenantTable(globalConn, baseTableFullName);
+
+ globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
+ globalViewName1, baseTableFullName, VIEW_PREFIX1,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+ globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
+ globalViewName2, baseTableFullName, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+ globalConn.createStatement().execute(
+ String.format(VIEW_INDEX_DDL, indexTable1, globalViewName1, "A,B"));
+ globalConn.createStatement().execute(
+ String.format(VIEW_INDEX_DDL, indexTable2, globalViewName1, "C,D"));
+
+ globalConn.createStatement().execute(
+ String.format(VIEW_INDEX_DDL, indexTable3, globalViewName2, "A,B"));
+ globalConn.createStatement().execute(
+ String.format(VIEW_INDEX_DDL, indexTable4, globalViewName2, "C,D"));
+
+ tenant1Connection.setAutoCommit(true);
+ tenant2Connection.setAutoCommit(true);
+
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
+
+ // wait the row to be expired and index to be updated
+ Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+ verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
+ verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn);
+
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 4);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 4);
+
+ // running MR job to delete expired rows.
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+ assertEquals(0, status);
+
+ verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
+ verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2);
+ }
+ }
+
+ /*
+ BaseMultiTenantTable
+ GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY)
+ Index1 Index2 Index3 Index4
+ TenantView1 TenantView2
+
+ Upserting data to both global views, and run the MR job.
+ It should only delete GlobalView1 data not remove GlobalView2 data.
+ */
+ @Test
+ public void testTenantViewCase() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String globalViewName1 = schema + "." + generateUniqueName();
+ String globalViewName2 = schema + "." + generateUniqueName();
+ String tenantViewName1 = schema + "." + generateUniqueName();
+ String tenantViewName2 = schema + "." + generateUniqueName();
+ String indexTable1 = generateUniqueName() + "_IDX";
+ String indexTable2 = generateUniqueName() + "_IDX";
+ String indexTable3 = generateUniqueName() + "_IDX";
+ String indexTable4 = generateUniqueName() + "_IDX";
+ String indexTable = "_IDX_" + baseTableFullName;
+ String tenant1 = generateUniqueName();
+ String tenant2 = generateUniqueName();
+
+ try (Connection globalConn = DriverManager.getConnection(getUrl());
+ Connection tenant1Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+ Connection tenant2Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+ createMultiTenantTable(globalConn, baseTableFullName);
+ String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+ "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+ " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
+
+ globalConn.createStatement().execute(
+ String.format(ddl, globalViewName1, VIEW_PREFIX1,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+ globalConn.createStatement().execute(
+ String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+ ddl = "CREATE INDEX %s ON %s(%s)";
+
+ globalConn.createStatement().execute(
+ String.format(ddl, indexTable1, globalViewName1, "A,B"));
+ globalConn.createStatement().execute(
+ String.format(ddl, indexTable2, globalViewName1, "C,D"));
+
+ globalConn.createStatement().execute(
+ String.format(ddl, indexTable3, globalViewName2, "A,B"));
+ globalConn.createStatement().execute(
+ String.format(ddl, indexTable4, globalViewName2, "C,D"));
+
+ ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s";
+ tenant1Connection.createStatement().execute(
+ String.format(ddl, tenantViewName1, globalViewName1));
+ tenant1Connection.createStatement().execute(
+ String.format(ddl, tenantViewName2, globalViewName2));
+
+ tenant2Connection.createStatement().execute(
+ String.format(ddl, tenantViewName1, globalViewName1));
+ tenant2Connection.createStatement().execute(
+ String.format(ddl, tenantViewName2, globalViewName2));
+
+ tenant1Connection.setAutoCommit(true);
+ tenant2Connection.setAutoCommit(true);
+
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
+
+ // wait the row to be expired and index to be updated
+ Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+ verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
+ verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn);
+
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 4);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 4);
+
+ // running MR job to delete expired rows.
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+ assertEquals(0, status);
+
+ verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
+ verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2);
+ }
+ }
+
+ /*
+ BaseMultiTenantTable
+ GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY)
+ Upserting data to both global views, and run the MR job.
+ It should only delete GlobalView1 data not remove GlobalView2 data.
+ */
+ @Test
+ public void testGlobalViewWithNoIndex() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String globalViewName1 = schema + "." + generateUniqueName();
+ String globalViewName2 = schema + "." + generateUniqueName();
+ String tenant1 = generateUniqueName();
+ String tenant2 = generateUniqueName();
+
+ try (Connection globalConn = DriverManager.getConnection(getUrl());
+ Connection tenant1Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+ Connection tenant2Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+ createMultiTenantTable(globalConn, baseTableFullName);
+ String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+ "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+ " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
+
+ globalConn.createStatement().execute(
+ String.format(ddl, globalViewName1, VIEW_PREFIX1,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+ globalConn.createStatement().execute(
+ String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+ tenant1Connection.setAutoCommit(true);
+ tenant2Connection.setAutoCommit(true);
+
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
+
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
+
+ // wait the row to be expired and index to be updated
+ Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+ verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
+ verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn);
+
+ // running MR job to delete expired rows.
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+ assertEquals(0, status);
+
+ verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
+ verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
+ }
+ }
+
+ /*
+ BaseTable
+ GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY)
+ Upserting data to both global views, and run the MR job.
+ It should only delete GlobalView1 data not remove GlobalView2 data.
+ */
+ @Test
+ public void testGlobalViewOnNonMultiTenantTable() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String globalViewName1 = schema + "." + generateUniqueName();
+ String globalViewName2 = schema + "." + generateUniqueName();
+
+ try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+ String ddl = "CREATE TABLE " + baseTableFullName +
+ " (ID CHAR(10) NOT NULL PRIMARY KEY, NUM BIGINT)";
+ globalConn.createStatement().execute(ddl);
+
+ ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+ "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+ " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
+
+ globalConn.createStatement().execute(
+ String.format(ddl, globalViewName1, VIEW_PREFIX1,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+ globalConn.createStatement().execute(
+ String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+ globalConn.setAutoCommit(true);
+
+ globalConn.createStatement().execute(
+ String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
+ globalConn.createStatement().execute(
+ String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
+
+ // wait the row to be expired and index to be updated
+ Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+
+ // running MR job to delete expired rows.
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+ assertEquals(0, status);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+ }
+ }
+
+ /*
+ BaseTable
+ GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY)
+ Index1 Index2 Index3 Index4
+
+ Upserting data to both global views, and run the MR job.
+ It should only delete GlobalView1 data not remove GlobalView2 data.
+ */
+ @Test
+ public void testGlobalViewOnNonMultiTenantTableWithIndex() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String globalViewName1 = schema + "." + generateUniqueName();
+ String globalViewName2 = schema + "." + generateUniqueName();
+ String indexTable1 = generateUniqueName() + "_IDX";
+ String indexTable2 = generateUniqueName() + "_IDX";
+ String indexTable3 = generateUniqueName() + "_IDX";
+ String indexTable4 = generateUniqueName() + "_IDX";
+ String indexTable = "_IDX_" + baseTableFullName;
+
+ try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+ String ddl = "CREATE TABLE " + baseTableFullName +
+ " (PK1 BIGINT NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+ "PK PRIMARY KEY (PK1,ID))";
+ globalConn.createStatement().execute(ddl);
+
+ ddl = "CREATE VIEW %s (PK2 BIGINT PRIMARY KEY, " +
+ "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+ " AS SELECT * FROM " + baseTableFullName + " WHERE PK1=%d PHOENIX_TTL = %d";
+
+ globalConn.createStatement().execute(
+ String.format(ddl, globalViewName1, 1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+ globalConn.createStatement().execute(
+ String.format(ddl, globalViewName2, 2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+ ddl = "CREATE INDEX %s ON %s(%s)";
+ globalConn.createStatement().execute(
+ String.format(ddl, indexTable1, globalViewName1, "A,ID,B"));
+ globalConn.createStatement().execute(
+ String.format(ddl, indexTable2, globalViewName1, "C,ID,D"));
+ globalConn.createStatement().execute(
+ String.format(ddl, indexTable3, globalViewName2, "A,ID,B"));
+ globalConn.createStatement().execute(
+ String.format(ddl, indexTable4, globalViewName2, "C,ID,D"));
+
+ globalConn.setAutoCommit(true);
+
+ String query = "UPSERT INTO %s (PK2,A,B,C,D,ID) VALUES(1,1,1,1,1,'%s')";
+ globalConn.createStatement().execute(
+ String.format(query, globalViewName1, VIEW_PREFIX1));
+ globalConn.createStatement().execute(
+ String.format(query, globalViewName2, VIEW_PREFIX2));
+
+ // wait the row to be expired and index to be updated
+ Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX1 + ".*", 2);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX2 + ".*", 2);
+
+ // running MR job to delete expired rows.
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+ assertEquals(0, status);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX1 + ".*", 0);
+ verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX2 + ".*", 2);
+ }
+ }
+
+ /*
+ BaseMultiTenantTable
+ GlobalView1
+ TenantView1 TenantView2
+ */
+ @Test
+ public void testDeleteByViewAndTenant() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String globalViewName1 = schema + "." + generateUniqueName();
+ String tenantViewName1 = schema + "." + generateUniqueName();
+ String tenantViewName2 = schema + "." + generateUniqueName();
+ String tenant1 = generateUniqueName();
+
+ try (Connection globalConn = DriverManager.getConnection(getUrl());
+ Connection tenant1Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1)) {
+
+ createMultiTenantTable(globalConn, baseTableFullName);
+ String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+ "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+ " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = 1";
+
+ globalConn.createStatement().execute(String.format(ddl, globalViewName1));
+
+ ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s " +
+ "WHERE ID = '%s' PHOENIX_TTL = %d";
+ tenant1Connection.createStatement().execute(
+ String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+ tenant1Connection.createStatement().execute(
+ String.format(ddl, tenantViewName2, globalViewName1, VIEW_PREFIX2,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+
+ tenant1Connection.setAutoCommit(true);
+
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
+
+ // wait the row to be expired and index to be updated
+ Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+ verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+ // running MR job to delete expired rows.
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-v", tenantViewName2, "-i", tenant1});
+ assertEquals(0, status);
+
+ verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 0);
+ }
+ }
+
+ /*
+ BaseMultiTenantTable
+ GlobalView1 GlobalView1
+ TenantView1 TenantView2 TenantView1 TenantView2
+ */
+ @Test
+ public void testDeleteByTenant() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String globalViewName1 = schema + "." + generateUniqueName();
+ String globalViewName2 = schema + "." + generateUniqueName();
+ String tenantViewName1 = schema + "." + generateUniqueName();
+ String tenantViewName2 = schema + "." + generateUniqueName();
+ String tenantViewName3 = schema + "." + generateUniqueName();
+ String tenantViewName4 = schema + "." + generateUniqueName();
+ String tenant1 = generateUniqueName();
+ String tenant2 = generateUniqueName();
+
+ try (Connection globalConn = DriverManager.getConnection(getUrl());
+ Connection tenant1Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+ Connection tenant2Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+ createMultiTenantTable(globalConn, baseTableFullName);
+ String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+ "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+ " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = %d";
+
+ globalConn.createStatement().execute(String.format(ddl, globalViewName1, 1));
+ globalConn.createStatement().execute(String.format(ddl, globalViewName2, 2));
+
+ ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s " +
+ "WHERE ID = '%s' PHOENIX_TTL = %d";
+ tenant1Connection.createStatement().execute(
+ String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+ tenant1Connection.createStatement().execute(
+ String.format(ddl, tenantViewName2, globalViewName2, VIEW_PREFIX2,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+
+ tenant2Connection.createStatement().execute(
+ String.format(ddl, tenantViewName3, globalViewName1, VIEW_PREFIX1,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+ tenant2Connection.createStatement().execute(
+ String.format(ddl, tenantViewName4, globalViewName2, VIEW_PREFIX2,
+ PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+
+ tenant1Connection.setAutoCommit(true);
+ tenant2Connection.setAutoCommit(true);
+
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName3));
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName4));
+
+ // wait the row to be expired and index to be updated
+ Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant1 + ".*", 2);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant2 + ".*", 2);
+
+ // running MR job to delete expired rows.
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-i", tenant1});
+ assertEquals(0, status);
+
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant2 + ".*", 2);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant1 + ".*", 0);
+ }
+ }
+
+ /*
+ BaseMultiTenantTable
+ GlobalView1 GlobalView1
+ TenantView1 TenantView2 TenantView1 TenantView2
+ */
+ @Test
+ public void testDeleteByViewName() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String globalViewName1 = schema + "." + generateUniqueName();
+ String globalViewName2 = schema + "." + generateUniqueName();
+ String tenantViewName1 = schema + "." + generateUniqueName();
+ String tenantViewName2 = schema + "." + generateUniqueName();
+ String tenantViewName3 = schema + "." + generateUniqueName();
+ String tenantViewName4 = schema + "." + generateUniqueName();
+ String tenant1 = generateUniqueName();
+ String tenant2 = generateUniqueName();
+
+ try (Connection globalConn = DriverManager.getConnection(getUrl());
+ Connection tenant1Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+ Connection tenant2Connection =
+ PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+ createMultiTenantTable(globalConn, baseTableFullName);
+ String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+ "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+ " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = %d PHOENIX_TTL = %d";
+
+ globalConn.createStatement().execute(
+ String.format(ddl, globalViewName1, 1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+ globalConn.createStatement().execute(
+ String.format(ddl, globalViewName2, 2, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+
+ ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s WHERE ID = '%s'";
+ tenant1Connection.createStatement().execute(
+ String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1));
+ tenant1Connection.createStatement().execute(
+ String.format(ddl, tenantViewName2, globalViewName2, VIEW_PREFIX2));
+
+ tenant2Connection.createStatement().execute(
+ String.format(ddl, tenantViewName3, globalViewName1, VIEW_PREFIX1));
+ tenant2Connection.createStatement().execute(
+ String.format(ddl, tenantViewName4, globalViewName2, VIEW_PREFIX2));
+
+ tenant1Connection.setAutoCommit(true);
+ tenant2Connection.setAutoCommit(true);
+
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
+ tenant1Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName3));
+ tenant2Connection.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName4));
+
+ // wait the row to be expired and index to be updated
+ Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 2);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 2);
+
+ // running MR job to delete expired rows.
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-v", globalViewName1});
+ assertEquals(0, status);
+
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 2);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
+ }
+ }
+
+
+ /*
+ BaseTable
+ GlobalView1 with TTL
+ MiddleLevelView1 with TTL(1 ms) MiddleLevelView2 with TTL(1 DAY)
+ LeafView1 LeafView2
+ Upserting data to both leafView, and run the MR job.
+ It should only delete MiddleLevelView1 data not remove MiddleLevelView2 data.
+ */
+ @Test
+ public void testCleanMoreThanThreeLevelViewCase() throws Exception {
+ String schema = generateUniqueName();
+ String baseTableFullName = schema + "." + generateUniqueName();
+ String globalViewName = schema + "." + generateUniqueName();
+ String middleLevelViewName1 = schema + "." + generateUniqueName();
+ String middleLevelViewName2 = schema + "." + generateUniqueName();
+ String leafViewName1 = schema + "." + generateUniqueName();
+ String leafViewName2 = schema + "." + generateUniqueName();
+
+ try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+ String baseTableDdl = "CREATE TABLE " + baseTableFullName +
+ " (ID CHAR(10) NOT NULL PRIMARY KEY, NUM BIGINT)";
+ globalConn.createStatement().execute(baseTableDdl);
+
+ String globalViewDdl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+ "A BIGINT, B BIGINT)" + " AS SELECT * FROM " + baseTableFullName;
+
+ globalConn.createStatement().execute(String.format(globalViewDdl, globalViewName));
+
+ String middleLevelViewDdl = "CREATE VIEW %s (C BIGINT, D BIGINT)" +
+ " AS SELECT * FROM %s WHERE ID ='%s' PHOENIX_TTL = %d";
+
+ globalConn.createStatement().execute(String.format(middleLevelViewDdl,
+ middleLevelViewName1, globalViewName,
+ VIEW_PREFIX1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+ globalConn.createStatement().execute(String.format(middleLevelViewDdl,
+ middleLevelViewName2, globalViewName, VIEW_PREFIX2,
+ PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+ String leafViewDdl = "CREATE VIEW %s (E BIGINT, F BIGINT)" +
+ " AS SELECT * FROM %s";
+
+ globalConn.createStatement().execute(String.format(leafViewDdl,
+ leafViewName1, middleLevelViewName1));
+ globalConn.createStatement().execute(String.format(leafViewDdl,
+ leafViewName2, middleLevelViewName2));
+
+ globalConn.setAutoCommit(true);
+
+ globalConn.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, leafViewName1));
+ globalConn.createStatement().execute(
+ String.format(UPSERT_TO_LEAF_VIEW_QUERY, leafViewName2));
+
+ // wait the row to be expired and index to be updated
+ Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+
+ // running MR job to delete expired rows.
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+ assertEquals(0, status);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+ verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
+ }
+ }
+
+ @Test
+ public void testNoViewCase() throws Exception {
+ PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ phoenixTtlTool.setConf(conf);
+ int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+ assertEquals(0, status);
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java
new file mode 100644
index 0000000..9d7da2e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.DefaultPhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewSplitStrategy;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.apache.phoenix.mapreduce.util.MultiViewSplitStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ This is a generic MultiViewInputFormat class that using by the MR job. You can
+ provide your own split strategy and provider class to customize your own business needed by
+ overwrite and load class blow:
+ MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ
+ MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ
+ */
+public class PhoenixMultiViewInputFormat<T extends Writable> extends InputFormat<NullWritable,T> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMultiViewInputFormat.class);
+
+ @Override public List<InputSplit> getSplits(JobContext context) throws IOException {
+ List<InputSplit> listOfInputSplit;
+ try {
+ final Configuration configuration = context.getConfiguration();
+ Class<?> defaultMultiInputStrategyClazz = DefaultPhoenixMultiViewListProvider.class;
+ if (configuration.get(
+ PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ) != null) {
+ defaultMultiInputStrategyClazz = Class.forName(configuration.get(
+ PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ));
+ }
+ PhoenixMultiViewListProvider phoenixMultiViewListProvider =
+ (PhoenixMultiViewListProvider) defaultMultiInputStrategyClazz.newInstance();
+ List<ViewInfoWritable> views =
+ phoenixMultiViewListProvider.getPhoenixMultiViewList(configuration);
+
+ Class<?> defaultDeletionMultiInputSplitStrategyClazz =
+ DefaultMultiViewSplitStrategy.class;
+ if (configuration.get(
+ PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ) != null) {
+ defaultDeletionMultiInputSplitStrategyClazz = Class.forName(configuration.get(
+ PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ));
+ }
+ MultiViewSplitStrategy multiViewSplitStrategy = (MultiViewSplitStrategy)
+ defaultDeletionMultiInputSplitStrategyClazz.newInstance();
+ listOfInputSplit = multiViewSplitStrategy.generateSplits(views, configuration);
+ } catch (ClassNotFoundException e) {
+ LOGGER.debug("PhoenixMultiViewInputFormat is getting ClassNotFoundException : " +
+ e.getMessage());
+ throw new IOException(
+ "PhoenixMultiViewInputFormat is getting ClassNotFoundException : " +
+ e.getMessage(), e.getCause());
+ } catch (InstantiationException e) {
+ LOGGER.debug("PhoenixMultiViewInputFormat is getting InstantiationException : " +
+ e.getMessage());
+ throw new IOException(
+ "PhoenixMultiViewInputFormat is getting InstantiationException : " +
+ e.getMessage(), e.getCause());
+ } catch (IllegalAccessException e) {
+ LOGGER.debug("PhoenixMultiViewInputFormat is getting IllegalAccessException : " +
+ e.getMessage());
+ throw new IOException(
+ "PhoenixMultiViewInputFormat is getting IllegalAccessException : " +
+ e.getMessage(), e.getCause());
+ }
+
+ return listOfInputSplit == null ? new ArrayList<InputSplit>() : listOfInputSplit;
+ }
+
+ @Override
+ public RecordReader<NullWritable,T> createRecordReader(InputSplit split,
+ TaskAttemptContext context) {
+ final Configuration configuration = context.getConfiguration();
+
+ final Class<T> inputClass =
+ (Class<T>) PhoenixConfigurationUtil.getInputClass(configuration);
+ return getPhoenixRecordReader(inputClass, configuration);
+ }
+
+ private RecordReader<NullWritable,T> getPhoenixRecordReader(Class<T> inputClass,
+ Configuration configuration) {
+ return new PhoenixMultiViewReader<>(inputClass , configuration);
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java
new file mode 100644
index 0000000..37f4218
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ Generic class that provide a list of views for the MR job. You can overwrite your own logic to
+ filter/add views.
+ */
+public class PhoenixMultiViewInputSplit extends InputSplit implements Writable {
+
+ List<ViewInfoWritable> viewInfoTrackerList;
+
+ public PhoenixMultiViewInputSplit() {
+ this.viewInfoTrackerList = new ArrayList<>();
+ }
+
+ public PhoenixMultiViewInputSplit(List<ViewInfoWritable> viewInfoTracker) {
+ this.viewInfoTrackerList = viewInfoTracker;
+ }
+
+ @Override public void write(DataOutput output) throws IOException {
+ WritableUtils.writeVInt(output, this.viewInfoTrackerList.size());
+ for (ViewInfoWritable viewInfoWritable : this.viewInfoTrackerList) {
+ if (viewInfoWritable instanceof ViewInfoTracker) {
+ viewInfoWritable.write(output);
+ }
+ }
+ }
+
+ @Override public void readFields(DataInput input) throws IOException {
+ int count = WritableUtils.readVInt(input);
+ for (int i = 0; i < count; i++) {
+ ViewInfoTracker viewInfoTracker = new ViewInfoTracker();
+ viewInfoTracker.readFields(input);
+ this.viewInfoTrackerList.add(viewInfoTracker);
+ }
+ }
+
+ @Override public long getLength() {
+ return 0;
+ }
+
+ @Override public String[] getLocations() {
+ return new String[0];
+ }
+
+ public List<ViewInfoWritable> getViewInfoTrackerList() {
+ return this.viewInfoTrackerList;
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReader.java
new file mode 100644
index 0000000..f1d7662
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReader.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class PhoenixMultiViewReader<T extends Writable> extends RecordReader<NullWritable,T> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMultiViewReader.class);
+
+ private Configuration configuration;
+ private Class<T> inputClass;
+ Iterator<ViewInfoWritable> it;
+
+ public PhoenixMultiViewReader() {
+
+ }
+
+ public PhoenixMultiViewReader(final Class<T> inputClass, final Configuration configuration) {
+ this.configuration = configuration;
+ this.inputClass = inputClass;
+ }
+
+ @Override public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ if (split instanceof PhoenixMultiViewInputSplit) {
+ final PhoenixMultiViewInputSplit pSplit = (PhoenixMultiViewInputSplit)split;
+ final List<ViewInfoWritable> viewInfoTracker = pSplit.getViewInfoTrackerList();
+ it = viewInfoTracker.iterator();
+ } else {
+ LOGGER.error("InputSplit class cannot cast to PhoenixMultiViewInputSplit.");
+ throw new IOException("InputSplit class cannot cast to PhoenixMultiViewInputSplit");
+ }
+ }
+
+ @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+ return it.hasNext();
+ }
+
+ @Override public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override public T getCurrentValue() throws IOException, InterruptedException {
+ ViewInfoWritable currentValue = null;
+ if (it.hasNext()) {
+ currentValue = it.next();
+ }
+ return (T)currentValue;
+ }
+
+ @Override public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override public void close() throws IOException {
+
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java
new file mode 100644
index 0000000..3e104fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+import org.apache.phoenix.mapreduce.util.MultiViewJobStatusTracker;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewJobStatusTracker;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class PhoenixTTLDeleteJobMapper extends Mapper<NullWritable, ViewInfoTracker,
+ NullWritable, NullWritable> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixTTLDeleteJobMapper.class);
+ private MultiViewJobStatusTracker multiViewJobStatusTracker;
+ private static final int DEFAULT_MAX_RETRIES = 3;
+ private static final int DEFAULT_RETRY_SLEEP_TIME_IN_MS = 10000;
+
+ private void initMultiViewJobStatusTracker(Configuration config) throws Exception {
+ try {
+ Class<?> defaultViewDeletionTrackerClass = DefaultMultiViewJobStatusTracker.class;
+ if (config.get(
+ PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ) != null) {
+ LOGGER.info("Using customized tracker class : " + config.get(
+ PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+ defaultViewDeletionTrackerClass = Class.forName(config.get(
+ PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+ } else {
+ LOGGER.info("Using default tracker class ");
+ }
+ this.multiViewJobStatusTracker = (MultiViewJobStatusTracker)
+ defaultViewDeletionTrackerClass.newInstance();
+ } catch (Exception e) {
+ LOGGER.error("Getting exception While initializing initMultiViewJobStatusTracker " +
+ "with error message " + e.getMessage());
+ throw e;
+ }
+ }
+
+ @Override
+ protected void map(NullWritable key, ViewInfoTracker value, Context context)
+ throws IOException {
+ try {
+ final Configuration config = context.getConfiguration();
+
+ if (this.multiViewJobStatusTracker == null) {
+ initMultiViewJobStatusTracker(config);
+ }
+
+ LOGGER.debug(String.format("Deleting from view %s, TenantID %s, and TTL value: %d",
+ value.getViewName(), value.getTenantId(), value.getPhoenixTtl()));
+
+ deleteExpiredRows(value, config, context);
+
+ } catch (SQLException e) {
+ LOGGER.error("Mapper got an exception while deleting expired rows : "
+ + e.getMessage() );
+ throw new IOException(e.getMessage(), e.getCause());
+ } catch (Exception e) {
+ LOGGER.error("Getting IOException while running View TTL Deletion Job mapper " +
+ "with error : " + e.getMessage());
+ throw new IOException(e.getMessage(), e.getCause());
+ }
+ }
+
+ private void deleteExpiredRows(ViewInfoTracker value, Configuration config, Context context)
+ throws Exception {
+ try (PhoenixConnection connection =
+ (PhoenixConnection) ConnectionUtil.getInputConnection(config)) {
+ if (value.getTenantId() != null && !value.getTenantId().equals("NULL")) {
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, value.getTenantId());
+
+ try (PhoenixConnection tenantConnection = (PhoenixConnection)
+ DriverManager.getConnection(connection.getURL(), props)) {
+ deleteExpiredRows(tenantConnection, value, config, context);
+ }
+ } else {
+ deleteExpiredRows(connection, value, config, context);
+ }
+ }
+ }
+
+ /*
+ * Each Mapper that receives a MultiPhoenixViewInputSplit will execute a DeleteMutation/Scan
+ * (With DELETE_TTL_EXPIRED attribute) per view for all the views and view indexes in the split.
+ * For each DeleteMutation, it bounded by the view start and stop keys for the region and
+ * TTL attributes and Delete Hint.
+ */
+ private void deleteExpiredRows(PhoenixConnection connection, ViewInfoTracker viewInfoTracker,
+ Configuration config, Context context) throws Exception {
+ try (PhoenixStatement pstmt =
+ new PhoenixStatement(connection).unwrap(PhoenixStatement.class)) {
+ PTable pTable = PhoenixRuntime.getTable(connection, viewInfoTracker.getViewName());
+ String deleteIfExpiredStatement = "SELECT /*+ NO_INDEX */ count(*) FROM " +
+ viewInfoTracker.getViewName();
+
+ if (viewInfoTracker.isIndexRelation()) {
+ pTable = PhoenixRuntime.getTable(connection, viewInfoTracker.getRelationName());
+ deleteIfExpiredStatement = "SELECT count(*) FROM " +
+ viewInfoTracker.getRelationName();
+ }
+
+ String sourceTableName = pTable.getTableName().getString();
+ this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+ ViewInfoJobState.INITIALIZED.getValue(), config, 0, context.getJobName());
+ final QueryPlan queryPlan = pstmt.optimizeQuery(deleteIfExpiredStatement);
+ final Scan scan = queryPlan.getContext().getScan();
+ byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(pTable);
+ byte[] emptyColumnName = pTable.getEncodingScheme() ==
+ PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+ QueryConstants.EMPTY_COLUMN_BYTES :
+ pTable.getEncodingScheme().encode(
+ QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+ scan.setAttribute(
+ BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName);
+ scan.setAttribute(
+ BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
+ scan.setAttribute(
+ BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
+ scan.setAttribute(
+ BaseScannerRegionObserver.PHOENIX_TTL,
+ Bytes.toBytes(viewInfoTracker.getPhoenixTtl()));
+ scan.setAttribute(
+ BaseScannerRegionObserver.PHOENIX_TTL_SCAN_TABLE_NAME,
+ Bytes.toBytes(sourceTableName));
+
+ this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+ ViewInfoJobState.RUNNING.getValue(), config, 0, context.getJobName());
+
+ addingDeletionMarkWithRetries(pstmt, viewInfoTracker, config, context,
+ queryPlan);
+ } catch (Exception e) {
+ if (e instanceof SQLException && ((SQLException) e).getErrorCode() ==
+ SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()) {
+ this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+ ViewInfoJobState.DELETED.getValue(), config, 0, context.getJobName());
+ }
+ LOGGER.error(String.format("Had an issue to process the view: %s, " +
+ "see error %s ", viewInfoTracker.toString(),e.getMessage()));
+ }
+ }
+
+ private boolean addingDeletionMarkWithRetries(PhoenixStatement stmt,
+ ViewInfoTracker viewInfoTracker,
+ Configuration config, Context context,
+ QueryPlan queryPlan)
+ throws Exception {
+ int retry = 0;
+ long startTime = System.currentTimeMillis();
+ String viewInfo = viewInfoTracker.getTenantId() == null ?
+ viewInfoTracker.getViewName() : viewInfoTracker.getTenantId()
+ + "." + viewInfoTracker.getViewName();
+
+ while (retry < DEFAULT_MAX_RETRIES) {
+ try {
+ PhoenixResultSet rs = stmt.newResultSet(
+ queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext());
+
+ long numberOfDeletedRows = 0;
+ if (rs.next()) {
+ numberOfDeletedRows = rs.getLong(1);
+ }
+ this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, numberOfDeletedRows,
+ ViewInfoJobState.SUCCEEDED.getValue(), config,
+ System.currentTimeMillis() - startTime, context.getJobName());
+ PhoenixTTLTool.MR_COUNTER_METRICS metricsStatus =
+ viewInfoTracker.isIndexRelation() ?
+ PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_INDEX_SUCCEED :
+ PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_SUCCEED;
+ context.getCounter(metricsStatus).increment(1);
+ return true;
+ } catch (Exception e) {
+ PhoenixTTLTool.MR_COUNTER_METRICS metricsStatus =
+ viewInfoTracker.isIndexRelation() ?
+ PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_INDEX_FAILED :
+ PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_FAILED;
+ if (e instanceof SQLException && ((SQLException) e).getErrorCode() ==
+ SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()) {
+ LOGGER.info(viewInfo + " has been deleted : " + e.getMessage());
+ this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+ ViewInfoJobState.DELETED.getValue(), config, 0, context.getJobName());
+ context.getCounter(metricsStatus).increment(1);
+ return false;
+ }
+ retry++;
+
+ if (retry == DEFAULT_MAX_RETRIES) {
+ LOGGER.error("Deleting " + viewInfo + " expired rows has an exception for : "
+ + e.getMessage());
+ this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+ ViewInfoJobState.FAILED.getValue(), config, 0, context.getJobName());
+ context.getCounter(metricsStatus).increment(1);
+ throw e;
+ } else {
+ Thread.sleep(DEFAULT_RETRY_SLEEP_TIME_IN_MS);
+ }
+ }
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLTool.java
new file mode 100644
index 0000000..153e7be
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLTool.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+public class PhoenixTTLTool extends Configured implements Tool {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixTTLTool.class);
+
+ public static enum MR_COUNTER_METRICS {
+ VIEW_FAILED,
+ VIEW_SUCCEED,
+ VIEW_INDEX_FAILED,
+ VIEW_INDEX_SUCCEED
+ }
+
+ public static final String DELETE_ALL_VIEWS = "DELETE_ALL_VIEWS";
+ public static final int DEFAULT_MAPPER_SPLIT_SIZE = 10;
+ public static final int DEFAULT_QUERY_BATCH_SIZE = 100;
+
+ private static final Option DELETE_ALL_VIEWS_OPTION = new Option("a", "all", false,
+ "Delete all views from all tables.");
+ private static final Option VIEW_NAME_OPTION = new Option("v", "view", true,
+ "Delete Phoenix View Name");
+ private static final Option TENANT_ID_OPTION = new Option("i", "id", true,
+ "Delete an view based on the tenant id.");
+ private static final Option JOB_PRIORITY_OPTION = new Option("p", "job-priority", true,
+ "Define job priority from 0(highest) to 4");
+ private static final Option SPLIT_SIZE_OPTION = new Option("s", "split-size-per-mapper", true,
+ "Define split size for each mapper.");
+ private static final Option BATCH_SIZE_OPTION = new Option("b", "batch-size-for-query-more", true,
+ "Define batch size for fetching views metadata from syscat.");
+ private static final Option RUN_FOREGROUND_OPTION = new Option("runfg",
+ "run-foreground", false, "If specified, runs PhoenixTTLTool " +
+ "in Foreground. Default - Runs the build in background");
+
+ private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+
+ private Configuration configuration;
+ private Connection connection;
+ private String viewName;
+ private String tenantId;
+ private String jobName;
+ private boolean isDeletingAllViews;
+ private JobPriority jobPriority;
+ private boolean isForeground;
+ private int splitSize;
+ private int batchSize;
+ private Job job;
+
+ public void parseArgs(String[] args) {
+ CommandLine cmdLine;
+ try {
+ cmdLine = parseOptions(args);
+ } catch (IllegalStateException e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ throw e;
+ }
+
+ if (getConf() == null) {
+ setConf(HBaseConfiguration.create());
+ }
+
+ if (cmdLine.hasOption(DELETE_ALL_VIEWS_OPTION.getOpt())) {
+ this.isDeletingAllViews = true;
+ } else if (cmdLine.hasOption(VIEW_NAME_OPTION.getOpt())) {
+ viewName = cmdLine.getOptionValue(VIEW_NAME_OPTION.getOpt());
+ this.isDeletingAllViews = false;
+ }
+
+ if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+ tenantId = cmdLine.getOptionValue((TENANT_ID_OPTION.getOpt()));
+ }
+
+ if (cmdLine.hasOption(SPLIT_SIZE_OPTION.getOpt())) {
+ splitSize = Integer.parseInt(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt()));
+ } else {
+ splitSize = DEFAULT_MAPPER_SPLIT_SIZE;
+ }
+
+ if (cmdLine.hasOption(BATCH_SIZE_OPTION.getOpt())) {
+ batchSize = Integer.parseInt(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt()));
+ } else {
+ batchSize = DEFAULT_QUERY_BATCH_SIZE;
+ }
+
+ isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+ }
+
+ public String getJobPriority() {
+ return this.jobPriority.toString();
+ }
+
+ private JobPriority getJobPriority(CommandLine cmdLine) {
+ String jobPriorityOption = cmdLine.getOptionValue(JOB_PRIORITY_OPTION.getOpt());
+ if (jobPriorityOption == null) {
+ return JobPriority.NORMAL;
+ }
+
+ switch (jobPriorityOption) {
+ case "0" : return JobPriority.VERY_HIGH;
+ case "1" : return JobPriority.HIGH;
+ case "2" : return JobPriority.NORMAL;
+ case "3" : return JobPriority.LOW;
+ case "4" : return JobPriority.VERY_LOW;
+ default:
+ return JobPriority.NORMAL;
+ }
+ }
+
+ public Job getJob() {
+ return this.job;
+ }
+
+ public boolean isDeletingAllViews() {
+ return this.isDeletingAllViews;
+ }
+
+ public String getTenantId() {
+ return this.tenantId;
+ }
+
+ public String getViewName() {
+ return this.viewName;
+ }
+
+ public int getSplitSize() {
+ return this.splitSize;
+ }
+
+ public int getBatchSize() {
+ return this.batchSize;
+ }
+
+ public CommandLine parseOptions(String[] args) {
+ final Options options = getOptions();
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parser.parse(options, args);
+ } catch (ParseException e) {
+ printHelpAndExit("Error parsing command line options: " + e.getMessage(),
+ options);
+ }
+
+ if (!cmdLine.hasOption(DELETE_ALL_VIEWS_OPTION.getOpt()) &&
+ !cmdLine.hasOption(VIEW_NAME_OPTION.getOpt()) &&
+ !cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+ throw new IllegalStateException("No deletion job is specified, " +
+ "please indicate deletion job for ALL/VIEW/TENANT level");
+ }
+
+ if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+ printHelpAndExit(options, 0);
+ }
+
+ this.jobPriority = getJobPriority(cmdLine);
+
+ return cmdLine;
+ }
+
+ private Options getOptions() {
+ final Options options = new Options();
+ options.addOption(DELETE_ALL_VIEWS_OPTION);
+ options.addOption(VIEW_NAME_OPTION);
+ options.addOption(TENANT_ID_OPTION);
+ options.addOption(HELP_OPTION);
+ options.addOption(JOB_PRIORITY_OPTION);
+ options.addOption(RUN_FOREGROUND_OPTION);
+ options.addOption(SPLIT_SIZE_OPTION);
+ options.addOption(BATCH_SIZE_OPTION);
+
+ return options;
+ }
+
+ private void printHelpAndExit(String errorMessage, Options options) {
+ System.err.println(errorMessage);
+ LOGGER.error(errorMessage);
+ printHelpAndExit(options, 1);
+ }
+
+ private void printHelpAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("help", options);
+ System.exit(exitCode);
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public String getJobName() {
+ if (this.jobName == null) {
+ String jobName;
+ if (this.isDeletingAllViews) {
+ jobName = DELETE_ALL_VIEWS;
+ } else if (this.getViewName() != null) {
+ jobName = this.getViewName();
+ } else {
+ jobName = this.tenantId;
+ }
+ this.jobName = "PhoenixTTLTool-" + jobName + "-";
+ }
+
+ return this.jobName;
+ }
+
+ public void setPhoenixTTLJobInputConfig(Configuration configuration) {
+ if (this.isDeletingAllViews) {
+ configuration.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS,
+ DELETE_ALL_VIEWS);
+ } else if (this.getViewName() != null) {
+ configuration.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW,
+ this.viewName);
+ }
+
+ if (this.tenantId != null) {
+ configuration.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, this.tenantId);
+ }
+ }
+
+ public void configureJob() throws Exception {
+ this.job = Job.getInstance(getConf(),getJobName());
+ PhoenixMapReduceUtil.setInput(job, this);
+
+ job.setJarByClass(PhoenixTTLTool.class);
+ job.setMapperClass(PhoenixTTLDeleteJobMapper.class);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setNumReduceTasks(0);
+ job.setPriority(this.jobPriority);
+
+ TableMapReduceUtil.addDependencyJars(job);
+ LOGGER.info("PhoenixTTLTool is running for " + job.getJobName());
+ }
+
+ public int runJob() {
+ try {
+ if (isForeground) {
+ LOGGER.info("Running PhoenixTTLTool in foreground. " +
+ "Runs full table scans. This may take a long time!");
+ return (job.waitForCompletion(true)) ? 0 : 1;
+ } else {
+ LOGGER.info("Running PhoenixTTLTool in Background - Submit async and exit");
+ job.submit();
+ return 0;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Caught exception " + e + " trying to run PhoenixTTLTool.");
+ return 1;
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ connection = null;
+ int ret;
+ try {
+ parseArgs(args);
+ configuration = HBaseConfiguration.addHbaseResources(getConf());
+ connection = ConnectionUtil.getInputConnection(configuration, new Properties());
+ configureJob();
+ TableMapReduceUtil.initCredentials(job);
+ ret = runJob();
+ } catch (Exception e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ return -1;
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ return ret;
+ }
+
+ public static void main(final String[] args) throws Exception {
+ int result = ToolRunner.run(new PhoenixTTLTool(), args);
+ System.exit(result);
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewJobStatusTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewJobStatusTracker.java
new file mode 100644
index 0000000..6e2f4f1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewJobStatusTracker.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultMultiViewJobStatusTracker implements MultiViewJobStatusTracker {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(DefaultMultiViewJobStatusTracker.class);
+
+ public void updateJobStatus(ViewInfoTracker view, long numberOfDeletedRows, int state,
+ Configuration config, long duration, String mrJobName) {
+ if (state == ViewInfoJobState.SUCCEEDED.getValue()) {
+ LOGGER.debug(String.format("Number of deleted rows from view %s, TenantID %s, " +
+ "and Source Table Name %s : " +
+ "number of deleted row %d, duration : %d, mr job name : %s.",
+ view.getViewName(), view.getTenantId(), view.getRelationName(),
+ numberOfDeletedRows, duration, mrJobName));
+ } else if (state == ViewInfoJobState.DELETED.getValue()) {
+ LOGGER.debug(String.format("View has been deleted, view info : view %s, TenantID %s, " +
+ "and Source Table Name %s : %d," +
+ " mr job name : %s.", view.getViewName(), view.getTenantId(),
+ view.getRelationName(), mrJobName));
+ } else {
+ LOGGER.debug(String.format("Job is in state %d for view %s, TenantID %s, " +
+ "Source Table Name %s , and duration : %d, " +
+ "mr job name : %s.", state, view.getViewName(), view.getTenantId(),
+ view.getRelationName(), duration, mrJobName));
+ }
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java
new file mode 100644
index 0000000..00eee09
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.PhoenixMultiViewInputSplit;
+
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.PhoenixTTLTool.DEFAULT_MAPPER_SPLIT_SIZE;
+
+public class DefaultMultiViewSplitStrategy implements MultiViewSplitStrategy {
+
+ public List<InputSplit> generateSplits(List<ViewInfoWritable> views,
+ Configuration configuration) {
+ int numViewsInSplit = PhoenixConfigurationUtil.getMultiViewSplitSize(configuration);
+
+ if (numViewsInSplit < 1) {
+ numViewsInSplit = DEFAULT_MAPPER_SPLIT_SIZE;
+ }
+
+ int numberOfMappers = getNumberOfMappers(views.size(),numViewsInSplit);
+
+ final List<InputSplit> pSplits = Lists.newArrayListWithExpectedSize(numberOfMappers);
+ // Split the views into splits
+
+ for (int i = 0; i < numberOfMappers; i++) {
+ pSplits.add(new PhoenixMultiViewInputSplit(views.subList(
+ i * numViewsInSplit, getUpperBound(numViewsInSplit, i, views.size()))));
+ }
+
+ return pSplits;
+ }
+
+ /*
+ Calculate number of mappers are needed based on split policy and
+ number of views on the cluster
+ */
+ public int getNumberOfMappers(int viewSize, int numViewsInSplit) {
+ int numberOfMappers = viewSize / numViewsInSplit;
+ if (Math.ceil(viewSize % numViewsInSplit) > 0) {
+ numberOfMappers++;
+ }
+ return numberOfMappers;
+ }
+
+ /*
+ Calculate the upper bound for each mapper. For example, given
+ split policy is 10 cleanup jobs per mapper, and the total view size at the cluster
+ is 12.
+ The first mapper will take from [0 - 10), this method will return 10 as upper bound
+ The second mapper will take from [10 - 12), this method will return 12 as upper bound.
+ */
+ public int getUpperBound(int numViewsInSplit, int i, int viewSize) {
+ int upper = (i + 1) * numViewsInSplit;
+ if (viewSize < upper) {
+ upper = viewSize;
+ }
+
+ return upper;
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
new file mode 100644
index 0000000..a9bd79e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DefaultPhoenixMultiViewListProvider implements PhoenixMultiViewListProvider {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(DefaultPhoenixMultiViewListProvider.class);
+
+ public List<ViewInfoWritable> getPhoenixMultiViewList(Configuration configuration) {
+ boolean isFetchAll = configuration.get(
+ PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS) != null;
+
+ if (!isFetchAll) {
+ return getTenantOrViewMultiViewList(configuration);
+ }
+ List<ViewInfoWritable> viewInfoWritables = new ArrayList<>();
+ boolean isQueryMore = true;
+ String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration);
+
+ int limit = PhoenixConfigurationUtil.getMultiViewQueryMoreSplitSize(configuration);
+
+ String schema = null;
+ String tableName = configuration.get(
+ PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW);
+ if (tableName != null) {
+ schema = SchemaUtil.getSchemaNameFromFullName(tableName);
+ }
+ String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+
+ try (PhoenixConnection connection = (PhoenixConnection)
+ ConnectionUtil.getInputConnection(configuration)){
+ try (PreparedStatement stmt = connection.prepareStatement(query)) {
+ do {
+ stmt.setString(1, tenantId);
+ stmt.setString(2, schema);
+ stmt.setString(3, tableName);
+ stmt.setInt(4, limit);
+
+ ResultSet viewRs = stmt.executeQuery();
+ String fullTableName = null;
+
+ while (viewRs.next()) {
+ tenantId = viewRs.getString(1);
+ schema = viewRs.getString(2);
+ tableName = viewRs.getString(3);
+ fullTableName = tableName;
+ Long viewTtlValue = viewRs.getLong(4);
+
+ if (schema != null && schema.length() > 0) {
+ fullTableName = SchemaUtil.getTableName(schema, tableName);
+ }
+
+ if (!isParentHasTTL(connection, tenantId, fullTableName)) {
+ addingViewIndexToTheFinalList(connection,tenantId,fullTableName,
+ viewTtlValue, viewInfoWritables);
+ }
+ }
+ if (isQueryMore) {
+ if (fullTableName == null) {
+ isQueryMore = false;
+ }
+ }
+ } while (isQueryMore);
+ }
+
+ } catch (Exception e) {
+ LOGGER.error("Getting view info failed with: " + e.getMessage());
+ }
+ return viewInfoWritables;
+ }
+
+ public List<ViewInfoWritable> getTenantOrViewMultiViewList(Configuration configuration) {
+ List<ViewInfoWritable> viewInfoWritables = new ArrayList<>();
+ String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration);
+
+ try (PhoenixConnection connection = (PhoenixConnection)
+ ConnectionUtil.getInputConnection(configuration)) {
+ try (Statement stmt = connection.createStatement()) {
+ ResultSet viewRs = stmt.executeQuery(query);
+ while (viewRs.next()) {
+ String tenantId = viewRs.getString(1);
+ String schema = viewRs.getString(2);
+ String tableName = viewRs.getString(3);
+ Long viewTtlValue = viewRs.getLong(4);
+ String fullTableName = tableName;
+
+ if (schema != null && schema.length() > 0) {
+ fullTableName = SchemaUtil.getTableName(schema, tableName);
+ }
+
+ if (!isParentHasTTL(connection, tenantId, fullTableName)) {
+ addingViewIndexToTheFinalList(connection,tenantId,fullTableName,
+ viewTtlValue, viewInfoWritables);
+ }
+ }
+ }
+ }catch (Exception e) {
+ LOGGER.error("Getting view info failed with: " + e.getMessage());
+ }
+ return viewInfoWritables;
+ }
+
+ private boolean isParentHasTTL(PhoenixConnection connection,
+ String tenantId, String fullTableName) {
+ boolean skip= false;
+ try {
+ PTable pTable = PhoenixRuntime.getTable(connection, tenantId, fullTableName);
+ PTable parentTable = PhoenixRuntime.getTable(connection, null,
+ pTable.getParentName().toString());
+ if (parentTable.getType() == PTableType.VIEW &&
+ parentTable.getPhoenixTTL() > 0) {
+ /* if the current view parent already has a TTL value, we want to
+ skip the current view cleanup job because we want to run the cleanup
+ job for at the GlobalView level instead of running multi-jobs at
+ the LeafView level for the better performance.
+
+ BaseTable
+ GlobalView(has TTL)
+ LeafView1, LeafView2, LeafView3....
+ */
+ skip = true;
+ }
+ } catch (Exception e) {
+ skip = true;
+ LOGGER.error(String.format("Had an issue to process the view: %s, " +
+ "tenantId: see error %s ", fullTableName, tenantId,
+ e.getMessage()));
+ }
+ return skip;
+ }
+
+ private void addingViewIndexToTheFinalList(PhoenixConnection connection, String tenantId,
+ String fullTableName, long viewTtlValue,
+ List<ViewInfoWritable> viewInfoWritables)
+ throws Exception {
+ PTable pTable = PhoenixRuntime.getTable(connection, tenantId, fullTableName);
+ ViewInfoWritable viewInfoTracker = new ViewInfoTracker(
+ tenantId,
+ fullTableName,
+ viewTtlValue,
+ pTable.getPhysicalName().getString(),
+ false
+
+ );
+ viewInfoWritables.add(viewInfoTracker);
+
+ List<PTable> allIndexesOnView = pTable.getIndexes();
+ for (PTable viewIndexTable : allIndexesOnView) {
+ String indexName = viewIndexTable.getTableName().getString();
+ String indexSchema = viewIndexTable.getSchemaName().getString();
+ if (indexName.contains(
+ QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+ indexName = SchemaUtil.getTableNameFromFullName(indexName,
+ QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
+ }
+ indexName = SchemaUtil.getTableNameFromFullName(indexName);
+ indexName = SchemaUtil.getTableName(indexSchema, indexName);
+ ViewInfoWritable viewInfoTrackerForIndexEntry = new ViewInfoTracker(
+ tenantId,
+ fullTableName,
+ viewTtlValue,
+ indexName,
+ true
+
+ );
+ viewInfoWritables.add(viewInfoTrackerForIndexEntry);
+ }
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewJobStatusTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewJobStatusTracker.java
new file mode 100644
index 0000000..7520dbe
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewJobStatusTracker.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface MultiViewJobStatusTracker {
+ void updateJobStatus(ViewInfoTracker view, long numberOfDeletedRows, int state,
+ Configuration config, long duration, String mrJobName);
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewSplitStrategy.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewSplitStrategy.java
new file mode 100644
index 0000000..d438005
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewSplitStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.util.List;
+
+public interface MultiViewSplitStrategy {
+ List<InputSplit> generateSplits(List<ViewInfoWritable> views, Configuration configuration);
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 82c1f2f..792b621 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -166,6 +166,27 @@ public final class PhoenixConfigurationUtil {
public static final String MAPREDUCE_JOB_TYPE = "phoenix.mapreduce.jobtype";
+ // group number of views per mapper to run the deletion job
+ public static final String MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE = "phoenix.mapreduce.multi.input.split.size";
+
+ public static final String MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE = "phoenix.mapreduce.multi.input.batch.size";
+
+ // phoenix ttl data deletion job for a specific view
+ public static final String MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW = "phoenix.mapreduce.phoenix_ttl.per_view";
+
+ // phoenix ttl data deletion job for all views.
+ public static final String MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS = "phoenix.mapreduce.phoenix_ttl.all";
+
+ // provide an absolute path to inject your multi input logic
+ public static final String MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ = "phoenix.mapreduce.multi.input.strategy.path";
+
+ // provide an absolute path to inject your multi split logic
+ public static final String MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ = "phoenix.mapreduce.multi.split.strategy.path";
+
+ // provide an absolute path to inject your multi input mapper logic
+ public static final String MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ = "phoenix.mapreduce.multi.mapper.tracker.path";
+
+
/**
* Determines type of Phoenix Map Reduce job.
* 1. QUERY allows running arbitrary queries without aggregates
@@ -408,12 +429,28 @@ public final class PhoenixConfigurationUtil {
}
- public static void setUpsertStatement(final Configuration configuration, String upsertStmt) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(upsertStmt);
- configuration.set(UPSERT_STATEMENT, upsertStmt);
- }
-
+ public static void setUpsertStatement(final Configuration configuration, String upsertStmt) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(upsertStmt);
+ configuration.set(UPSERT_STATEMENT, upsertStmt);
+ }
+
+ public static void setMultiInputMapperSplitSize(Configuration configuration, final int splitSize) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, String.valueOf(splitSize));
+ }
+
+ public static void setMultiViewQueryMoreSplitSize(Configuration configuration, final int batchSize) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE, String.valueOf(batchSize));
+ }
+
+ public static int getMultiViewQueryMoreSplitSize(final Configuration configuration) {
+ final String batchSize = configuration.get(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE);
+ Preconditions.checkNotNull(batchSize);
+ return Integer.valueOf(batchSize);
+ }
+
public static List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration) throws SQLException {
Preconditions.checkNotNull(configuration);
List<ColumnInfo> columnMetadataList = null;
@@ -438,6 +475,12 @@ public final class PhoenixConfigurationUtil {
return columnMetadataList;
}
+ public static int getMultiViewSplitSize(final Configuration configuration) {
+ final String splitSize = configuration.get(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE);
+ Preconditions.checkNotNull(splitSize);
+ return Integer.valueOf(splitSize);
+ }
+
private static List<String> getSelectColumnList(
final Configuration configuration) {
List<String> selectColumnList = PhoenixConfigurationUtil.getSelectColumnNames(configuration);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index 6c23fd9..cab2361 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixMultiViewInputFormat;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.PhoenixTTLTool;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import java.io.IOException;
@@ -127,6 +129,21 @@ public final class PhoenixMapReduceUtil {
/**
*
+ * @param job MR job instance
+ * @param tool PhoenixTtlTool for Phoenix TTL deletion MR job
+ */
+ public static void setInput(final Job job, PhoenixTTLTool tool) {
+ Configuration configuration = job.getConfiguration();
+ job.setInputFormatClass(PhoenixMultiViewInputFormat.class);
+ tool.setPhoenixTTLJobInputConfig(configuration);
+ PhoenixConfigurationUtil.setSchemaType(configuration,
+ PhoenixConfigurationUtil.SchemaType.QUERY);
+ PhoenixConfigurationUtil.setMultiInputMapperSplitSize(configuration, tool.getSplitSize());
+ PhoenixConfigurationUtil.setMultiViewQueryMoreSplitSize(configuration, tool.getBatchSize());
+ }
+
+ /**
+ *
* @param job
* @param inputClass DBWritable class
* @param snapshotName The name of a snapshot (of a table) to read from
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java
new file mode 100644
index 0000000..d25e0e3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.PhoenixTTLTool;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+
+public class PhoenixMultiInputUtil {
+ public static final String SELECT_ALL_VIEW_METADATA_FROM_SYSCAT_QUERY =
+ "SELECT TENANT_ID, TABLE_SCHEM, TABLE_NAME, PHOENIX_TTL FROM " +
+ SYSTEM_CATALOG_NAME + " WHERE " +
+ TABLE_TYPE + " = '" + PTableType.VIEW.getSerializedValue() + "' AND " +
+ PHOENIX_TTL + " IS NOT NULL AND " +
+ PHOENIX_TTL + " > " + PHOENIX_TTL_NOT_DEFINED + " AND " +
+ VIEW_TYPE + " <> " + PTable.ViewType.MAPPED.getSerializedValue();
+
+ public static Connection buildTenantConnection(String url, String tenantId)
+ throws SQLException {
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ return DriverManager.getConnection(url, props);
+ }
+
+ public static String getSelectAllPageQuery() {
+ return SELECT_ALL_VIEW_METADATA_FROM_SYSCAT_QUERY + " AND " +
+ "(TENANT_ID,TABLE_SCHEM,TABLE_NAME) > (?,?,?) LIMIT ?";
+ }
+
+ public static String constructViewMetadataQueryBasedOnView(String fullName, String tenantId) {
+ String query = SELECT_ALL_VIEW_METADATA_FROM_SYSCAT_QUERY;
+
+
+ if (fullName != null) {
+ if (fullName.equals(PhoenixTTLTool.DELETE_ALL_VIEWS)) {
+ return query;
+ }
+
+ String schema = SchemaUtil.getSchemaNameFromFullName(fullName);
+ String viewName = SchemaUtil.getTableNameFromFullName(fullName);
+
+ if (!schema.equals(StringUtil.EMPTY_STRING)) {
+ query = query + " AND TABLE_SCHEM = '" + schema + "'";
+ } else {
+ query = query + " AND TABLE_SCHEM IS NULL";
+ }
+
+ query = query + " AND TABLE_NAME = '" + viewName + "'";
+ }
+
+ if (tenantId != null && tenantId.length() > 0) {
+ query = query + " AND TENANT_ID = '" + tenantId + "'";
+ } else {
+ query = query + " AND TENANT_ID IS NULL";
+ }
+
+ return query;
+ }
+
+
+ public static String constructViewMetadataQueryBasedOnTenant(String tenant) {
+ return constructViewMetadataQueryBasedOnView(null, tenant);
+ }
+
+ public static String getFetchViewQuery(Configuration configuration) {
+ String query;
+ if (configuration.get(
+ PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS) != null) {
+ query = PhoenixMultiInputUtil.getSelectAllPageQuery();
+ } else if (configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID) != null &&
+ configuration.get(PhoenixConfigurationUtil.
+ MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW) == null) {
+ query = PhoenixMultiInputUtil.constructViewMetadataQueryBasedOnTenant(
+ configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID));
+ } else {
+ query = PhoenixMultiInputUtil.constructViewMetadataQueryBasedOnView(
+ configuration.get(
+ PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW),
+ configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID));
+ }
+ return query;
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiViewListProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiViewListProvider.java
new file mode 100644
index 0000000..154f47f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiViewListProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import java.util.List;
+
+public interface PhoenixMultiViewListProvider {
+ List<ViewInfoWritable> getPhoenixMultiViewList(Configuration configuration);
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java
new file mode 100644
index 0000000..a4f3226
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ViewInfoTracker implements ViewInfoWritable {
+
+ String tenantId;
+ String viewName;
+ String relationName;
+ long phoenixTtl;
+ boolean isIndexRelation;
+
+ public ViewInfoTracker() {
+
+ }
+
+ public ViewInfoTracker(String tenantId, String viewName, long phoenixTtl,
+ String relationName, boolean isIndexRelation) {
+ setTenantId(tenantId);
+ this.viewName = viewName;
+ this.phoenixTtl = phoenixTtl;
+ this.relationName = relationName;
+ this.isIndexRelation = isIndexRelation;
+ }
+
+ private void setTenantId(String tenantId) {
+ if (tenantId != null) {
+ this.tenantId = tenantId;
+ }
+ }
+
+ @Override
+ public String getTenantId() {
+ return tenantId;
+ }
+
+ @Override
+ public String getViewName() {
+ return viewName;
+ }
+
+ @Override
+ public String getRelationName() {
+ return relationName;
+ }
+
+ @Override
+ public boolean isIndexRelation() {
+ return this.isIndexRelation;
+ }
+
+ public long getPhoenixTtl() {
+ return phoenixTtl;
+ }
+
+ @Override public void write(DataOutput output) throws IOException {
+ WritableUtils.writeString(output, tenantId);
+ WritableUtils.writeString(output, viewName);
+ WritableUtils.writeVLong(output, phoenixTtl);
+ WritableUtils.writeString(output, relationName);
+ WritableUtils.writeString(output, isIndexRelation ? "true" : "false");
+ }
+
+ @Override public void readFields(DataInput input) throws IOException {
+ setTenantId(WritableUtils.readString(input));
+ viewName = WritableUtils.readString(input);
+ phoenixTtl = WritableUtils.readVLong(input);
+ relationName = WritableUtils.readString(input);
+ isIndexRelation = WritableUtils.readString(input).equals("true");
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ViewName" + this.viewName);
+ if (this.tenantId != null) {
+ sb.append(", Tenant:" + this.tenantId);
+ }
+ if (this.isIndexRelation) {
+ sb.append(", IndexName:" + this.relationName);
+ } else {
+ sb.append(", BaseTableName:" + this.relationName);
+ }
+
+ return sb.toString();
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoWritable.java
new file mode 100644
index 0000000..48a08e2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoWritable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface ViewInfoWritable extends Writable {
+ public enum ViewInfoJobState {
+ INITIALIZED(1),
+ RUNNING(2),
+ SUCCEEDED(3),
+ FAILED(4),
+ KILLED(5),
+ DELETED(6);
+
+ int value;
+
+ ViewInfoJobState(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return this.value;
+ }
+ }
+
+ void write(DataOutput output) throws IOException;
+ void readFields(DataInput input) throws IOException;
+ String getTenantId();
+ String getViewName();
+ String getRelationName(); // from index or data table
+ boolean isIndexRelation();
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 893f5d8..dab3a96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -336,7 +336,6 @@ public interface QueryServices extends SQLCloseable {
// Flag indicating that server side masking of ttl expired rows is enabled.
public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
-
// Before 4.15 when we created a view we included the parent table column metadata in the view
// metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no longer store the parent
// table column metadata along with the child view metadata. When we resolve a child view, we
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/DefaultMultiViewSplitStrategyTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/DefaultMultiViewSplitStrategyTest.java
new file mode 100644
index 0000000..3db85de
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/DefaultMultiViewSplitStrategyTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewSplitStrategy;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.apache.phoenix.query.BaseTest;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE;
+import static org.junit.Assert.assertEquals;
+
+public class DefaultMultiViewSplitStrategyTest extends BaseTest {
+ DefaultMultiViewSplitStrategy defaultMultiViewSplitStrategy =
+ new DefaultMultiViewSplitStrategy();
+ @Test
+ public void testGetUpperBound() {
+ // given split policy to be 10 with view size 12
+ // we expect 2 mappers with range [0,10) and [10,12)
+ assertEquals(10,
+ defaultMultiViewSplitStrategy.getUpperBound(10, 0, 12));
+ assertEquals(12,
+ defaultMultiViewSplitStrategy.getUpperBound(10, 1, 12));
+
+ // given split policy to be 8 with view size 12
+ // we expect 2 mappers with range [0,8) and [8,12)
+ assertEquals(8,
+ defaultMultiViewSplitStrategy.getUpperBound(8, 0, 12));
+ assertEquals(12,
+ defaultMultiViewSplitStrategy.getUpperBound(8, 1, 12));
+
+ // given split policy to be 5 with view size 12
+ // we expect 1 mappers with range [0,1)
+ assertEquals(1,
+ defaultMultiViewSplitStrategy.getUpperBound(5, 0, 1));
+ }
+
+ @Test
+ public void testGetNumberOfMappers() {
+ int viewSize = 0;
+ int numViewsInSplit = 10;
+
+ // test empty cluster, which is view size is 0
+ assertEquals(0,
+ defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit));
+
+ viewSize = 9;
+ // test viewSize is less than numViewsInSplit
+ assertEquals(1,
+ defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit));
+
+ // test viewSize is equal to numViewsInSplit
+ viewSize = 10;
+ assertEquals(1,
+ defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit));
+
+ // test viewSize is greater than numViewsInSplit
+ viewSize = 11;
+ assertEquals(2,
+ defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit));
+ }
+
+ @Test
+ public void testGenerateSplits() {
+ // test number of views greater than split policy
+ testGenerateSplits(11, 10, 2);
+
+ // test number of views equal to split policy
+ testGenerateSplits(10, 10, 1);
+
+ // test number of views equal to split policy
+ testGenerateSplits(8, 10, 1);
+
+ // test number of views is 0
+ testGenerateSplits(0, 10, 0);
+
+ // test split policy is 0
+ testGenerateSplits(8, 0, 1);
+ }
+
+ private void testGenerateSplits(int numberOfViews, int splitPolicy, int expectedResultSize) {
+ List<ViewInfoWritable> views = new ArrayList<>();
+ for (int i = 0; i < numberOfViews; i++) {
+ views.add(new ViewInfoTracker());
+ }
+ config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, String.valueOf(splitPolicy));
+ List<InputSplit> result = defaultMultiViewSplitStrategy.generateSplits(views, config);
+ assertEquals(expectedResultSize, result.size());
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
new file mode 100644
index 0000000..89a5c5a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobContext;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static junit.framework.TestCase.assertTrue;
+import static junit.framework.TestCase.fail;
+import static org.apache.phoenix.mapreduce.util.
+ PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE;
+import static org.apache.phoenix.mapreduce.util.
+ PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ;
+import static org.apache.phoenix.mapreduce.util.
+ PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ;
+import static org.mockito.Mockito.when;
+
+public class PhoenixMultiViewInputFormatTest {
+
+ @Test
+ public void testDefaultConfig() throws Exception {
+ PhoenixMultiViewInputFormat multiViewInputFormat = new PhoenixMultiViewInputFormat();
+
+ Configuration config = new Configuration();
+ config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
+ JobContext mockContext = Mockito.mock(JobContext.class);
+ when(mockContext.getConfiguration()).thenReturn(config);
+
+ // default run should not raise error
+ multiViewInputFormat.getSplits(mockContext);
+ }
+
+
+ @Test
+ public void testCustomizedInputStrategyClassNotExists() {
+ PhoenixMultiViewInputFormat multiViewInputFormat = new PhoenixMultiViewInputFormat();
+
+ Configuration config = new Configuration();
+ config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
+ config.set(MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ, "dummy.path");
+ JobContext mockContext = Mockito.mock(JobContext.class);
+ when(mockContext.getConfiguration()).thenReturn(config);
+
+ try {
+ multiViewInputFormat.getSplits(mockContext);
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("ClassNotFoundException"));
+ }
+ }
+
+ @Test
+ public void testCustomizedInputSplitClassNotExists() {
+ PhoenixMultiViewInputFormat multiViewInputFormat = new PhoenixMultiViewInputFormat();
+
+ Configuration config = new Configuration();
+ config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
+ config.set(MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ, "dummy.path");
+ JobContext mockContext = Mockito.mock(JobContext.class);
+ when(mockContext.getConfiguration()).thenReturn(config);
+
+ try {
+ multiViewInputFormat.getSplits(mockContext);
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("ClassNotFoundException"));
+ }
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java
new file mode 100644
index 0000000..0e439ee
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
+
+public class PhoenixMultiViewReaderTest {
+
+ @Test
+ public void test() throws Exception {
+ String tenantId = "Tenant1";
+ String viewName = "viewName1";
+ long ttl = 1;
+ String indexTable = "indexTable";
+ String globalView = "globalView";
+
+ PhoenixMultiViewInputSplit mockInput = Mockito.mock(PhoenixMultiViewInputSplit.class);
+ TaskAttemptContext mockContext = Mockito.mock(TaskAttemptContext.class);
+ List<ViewInfoWritable> viewInfoTracker = new ArrayList<>();
+ viewInfoTracker.add(new ViewInfoTracker(
+ tenantId,
+ viewName,
+ ttl,
+ globalView,
+ false
+ ));
+
+ viewInfoTracker.add(new ViewInfoTracker(
+ tenantId,
+ viewName,
+ ttl,
+ indexTable,
+ true
+ ));
+ when(mockInput.getViewInfoTrackerList()).thenReturn(viewInfoTracker);
+ PhoenixMultiViewReader phoenixMultiViewReader = new PhoenixMultiViewReader();
+ phoenixMultiViewReader.initialize(mockInput, mockContext);
+
+ ViewInfoTracker viewInfoWritable;
+ assertTrue(phoenixMultiViewReader.nextKeyValue());
+ viewInfoWritable = (ViewInfoTracker)phoenixMultiViewReader.getCurrentValue();
+ assertEquals(tenantId, viewInfoWritable.getTenantId());
+ assertEquals(viewName, viewInfoWritable.getViewName());
+ assertEquals(ttl, viewInfoWritable.getPhoenixTtl());
+ assertEquals(false, viewInfoWritable.isIndexRelation());
+
+ assertTrue(phoenixMultiViewReader.nextKeyValue());
+ viewInfoWritable = (ViewInfoTracker)phoenixMultiViewReader.getCurrentValue();
+ assertEquals(tenantId, viewInfoWritable.getTenantId());
+ assertEquals(viewName, viewInfoWritable.getViewName());
+ assertEquals(ttl, viewInfoWritable.getPhoenixTtl());
+ assertEquals(true, viewInfoWritable.isIndexRelation());
+
+ assertFalse(phoenixMultiViewReader.nextKeyValue());
+ viewInfoWritable = (ViewInfoTracker)phoenixMultiViewReader.getCurrentValue();
+ assertNull(phoenixMultiViewReader.getCurrentValue());
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTTLToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTTLToolTest.java
new file mode 100644
index 0000000..a453283
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTTLToolTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.phoenix.query.BaseTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PhoenixTTLToolTest extends BaseTest {
+ String viewName = generateUniqueName();
+ String tenantId = generateUniqueName();
+
+ @Test
+ public void testParseInput() {
+ PhoenixTTLTool tool = new PhoenixTTLTool();
+ tool.parseArgs(new String[] {"-a"});
+
+ assertEquals("NORMAL", tool.getJobPriority());
+ assertEquals(true, tool.isDeletingAllViews());
+ assertEquals(null, tool.getViewName());
+ assertEquals(null, tool.getTenantId());
+
+ tool = new PhoenixTTLTool();
+ tool.parseArgs(new String[] {"-v", viewName, "-i",tenantId });
+ assertEquals("NORMAL", tool.getJobPriority());
+ assertEquals(false, tool.isDeletingAllViews());
+ assertEquals(viewName, tool.getViewName());
+ assertEquals(tenantId, tool.getTenantId());
+
+ tool = new PhoenixTTLTool();
+ tool.parseArgs(new String[] {"-v", viewName, "-p", "0"});
+ assertEquals("VERY_HIGH", tool.getJobPriority());
+ assertEquals(false, tool.isDeletingAllViews());
+ assertEquals(viewName, tool.getViewName());
+ assertEquals(null, tool.getTenantId());
+
+ tool = new PhoenixTTLTool();
+ tool.parseArgs(new String[] {"-v", viewName, "-p", "-1"});
+ assertEquals("NORMAL", tool.getJobPriority());
+ assertEquals(false, tool.isDeletingAllViews());
+ assertEquals(viewName, tool.getViewName());
+ assertEquals(null, tool.getTenantId());
+
+ tool = new PhoenixTTLTool();
+ tool.parseArgs(new String[] {"-v", viewName, "-p", "DSAFDAS"});
+ assertEquals("NORMAL", tool.getJobPriority());
+ assertEquals(false, tool.isDeletingAllViews());
+ assertEquals(viewName, tool.getViewName());
+ assertEquals(null, tool.getTenantId());
+
+ tool = new PhoenixTTLTool();
+ tool.parseArgs(new String[] {"-i", tenantId});
+ assertEquals("NORMAL", tool.getJobPriority());
+ assertEquals(false, tool.isDeletingAllViews());
+ assertEquals(null, tool.getViewName());
+ assertEquals(tenantId, tool.getTenantId());
+ }
+
+ @Test (expected = IllegalStateException.class)
+ public void testNoInputParam() {
+ PhoenixTTLTool tool;
+ tool = new PhoenixTTLTool();
+ tool.parseOptions(new String[] {});
+ }
+}
\ No newline at end of file