You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2016/07/12 17:43:15 UTC
phoenix git commit: PHOENIX-3045 Data regions in transition forever
if RS holding them down during drop index
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 45f0004e0 -> a67f74d58
PHOENIX-3045 Data regions in transition forever if RS holding them down during drop index
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a67f74d5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a67f74d5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a67f74d5
Branch: refs/heads/4.x-HBase-0.98
Commit: a67f74d58e80a19acbb81dc266bab1fffea9cfc3
Parents: 45f0004
Author: Ankit Singhal <an...@gmail.com>
Authored: Tue Jul 12 23:12:52 2016 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Tue Jul 12 23:12:52 2016 +0530
----------------------------------------------------------------------
.../org/apache/phoenix/hbase/index/Indexer.java | 15 +--
.../phoenix/hbase/index/write/IndexWriter.java | 14 +-
.../hbase/index/write/RecoveryIndexWriter.java | 134 +++++++++++++++++++
.../phoenix/iterate/BaseResultIterators.java | 5 +-
.../stats/StatisticsCollectorFactory.java | 21 +--
.../phoenix/schema/stats/StatisticsUtil.java | 27 ++++
6 files changed, 176 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 0aed2a6..2956470 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
@@ -63,13 +62,13 @@ import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
-import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
-
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
import com.google.common.collect.Multimap;
/**
@@ -152,11 +151,6 @@ public class Indexer extends BaseRegionObserver {
// setup the actual index writer
this.writer = new IndexWriter(env, serverName + "-index-writer");
-
- // setup the recovery writer that does retries on the failed edits
- TrackingParallelWriterIndexCommitter recoveryCommmiter =
- new TrackingParallelWriterIndexCommitter();
-
try {
// get the specified failure policy. We only ever override it in tests, but we need to do it
// here
@@ -165,10 +159,9 @@ public class Indexer extends BaseRegionObserver {
StoreFailuresInCachePolicy.class, IndexFailurePolicy.class);
IndexFailurePolicy policy =
policyClass.getConstructor(PerRegionIndexWriteCache.class).newInstance(failedIndexEdits);
- LOG.debug("Setting up recovery writter with committer: " + recoveryCommmiter.getClass()
- + " and failure policy: " + policy.getClass());
+ LOG.debug("Setting up recovery writter with failure policy: " + policy.getClass());
recoveryWriter =
- new IndexWriter(recoveryCommmiter, policy, env, serverName + "-recovery-writer");
+ new RecoveryIndexWriter(policy, env, serverName + "-recovery-writer");
} catch (Exception ex) {
throw new IOException("Could not instantiate recovery failure policy!", ex);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index cbcec3b..831aa16 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -30,13 +30,13 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
/**
* Do the actual work of writing to the index tables. Ensures that if we do fail to write to the
* index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed.
@@ -171,11 +171,11 @@ public class IndexWriter implements Stoppable {
write(resolveTableReferences(toWrite), false);
}
- public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IndexWriteException {
+ public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IOException {
write(resolveTableReferences(toWrite), allowLocalUpdates);
}
-
- /**
+
+ /**
* see {@link #write(Collection)}
* @param toWrite
* @throws IndexWriteException
@@ -190,7 +190,7 @@ public class IndexWriter implements Stoppable {
* @param indexUpdates from the index builder
* @return pairs that can then be written by an {@link IndexWriter}.
*/
- public static Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
+ protected Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
Collection<Pair<Mutation, byte[]>> indexUpdates) {
Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap
.<HTableInterfaceReference, Mutation> create();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
new file mode 100644
index 0000000..be542bb
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Used to recover failed index edits during WAL replay
+ * <p>
+ * We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon threads, so it will
+ * not block the region from shutting down.
+ */
+public class RecoveryIndexWriter extends IndexWriter {
+
+ private static final Log LOG = LogFactory.getLog(RecoveryIndexWriter.class);
+ private Set<HTableInterfaceReference> nonExistingTablesList = new HashSet<HTableInterfaceReference>();
+ private HBaseAdmin admin;
+
+ /**
+ * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected to be fully setup
+ * before calling.
+ *
+ * @param committer
+ * @param policy
+ * @param env
+ * @throws IOException
+ * @throws ZooKeeperConnectionException
+ * @throws MasterNotRunningException
+ */
+ public RecoveryIndexWriter(IndexFailurePolicy policy, RegionCoprocessorEnvironment env, String name)
+ throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+ super(new TrackingParallelWriterIndexCommitter(), policy, env, name);
+ this.admin = new HBaseAdmin(env.getConfiguration());
+ }
+
+ @Override
+ public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IOException {
+ try {
+ write(resolveTableReferences(toWrite), allowLocalUpdates);
+ } catch (MultiIndexWriteFailureException e) {
+ for (HTableInterfaceReference table : e.getFailedTables()) {
+ if (!admin.tableExists(table.getTableName())) {
+ LOG.warn("Failure due to non existing table: " + table.getTableName());
+ nonExistingTablesList.add(table);
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * Convert the passed index updates to {@link HTableInterfaceReference}s.
+ *
+ * @param indexUpdates
+ * from the index builder
+ * @return pairs that can then be written by an {@link RecoveryIndexWriter}.
+ */
+ @Override
+ protected Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
+ Collection<Pair<Mutation, byte[]>> indexUpdates) {
+ Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap
+ .<HTableInterfaceReference, Mutation> create();
+
+ // simple map to make lookups easy while we build the map of tables to create
+ Map<ImmutableBytesPtr, HTableInterfaceReference> tables = new HashMap<ImmutableBytesPtr, HTableInterfaceReference>(
+ updates.size());
+ for (Pair<Mutation, byte[]> entry : indexUpdates) {
+ byte[] tableName = entry.getSecond();
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr(tableName);
+ HTableInterfaceReference table = tables.get(ptr);
+ if (nonExistingTablesList.contains(table)) {
+ LOG.debug("Edits found for non existing table: " + table.getTableName() + " so skipping it!!");
+ continue;
+ }
+ if (table == null) {
+ table = new HTableInterfaceReference(ptr);
+ tables.put(ptr, table);
+ }
+ updates.put(table, entry.getFirst());
+
+ }
+ return updates;
+ }
+
+ @Override
+ public void stop(String why) {
+ super.stop(why);
+ if (admin != null) {
+ try {
+ admin.close();
+ } catch (IOException e) {
+ // closing silently
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 4a797b8..c2a97b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -49,6 +49,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
@@ -80,11 +81,11 @@ import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.LogUtil;
@@ -360,7 +361,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
if (null == currentSCN) {
currentSCN = HConstants.LATEST_TIMESTAMP;
}
- tableStats = useStats() && table.getType() != PTableType.SYSTEM
+ tableStats = useStats() && StatisticsUtil.isStatsEnabled(TableName.valueOf(physicalTableName))
? context.getConnection().getQueryServices().getTableStats(physicalTableName, currentSCN)
: PTableStats.EMPTY_STATS;
// Used to tie all the scans together during logging
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
index 30c560a..1c65f09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
@@ -18,15 +18,9 @@
package org.apache.phoenix.schema.stats;
import java.io.IOException;
-import java.util.Set;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.SchemaUtil;
-
-import com.google.common.collect.Sets;
/**
* Provides new {@link DefaultStatisticsCollector} instances based on configuration settings for a
@@ -56,19 +50,6 @@ public class StatisticsCollectorFactory {
}
}
- // TODO: make this declarative through new DISABLE_STATS column on SYSTEM.CATALOG table.
- // Also useful would be a USE_CURRENT_TIME_FOR_STATS column on SYSTEM.CATALOG table.
- private static final Set<TableName> DISABLE_STATS = Sets.newHashSetWithExpectedSize(3);
- static {
- DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
- DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME));
- DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME));
- DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME));
- DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
- DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
- DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
- DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
- }
/**
* Determines if statistics are enabled (which is the default). This is done on the
@@ -78,7 +59,7 @@ public class StatisticsCollectorFactory {
*/
private static boolean statisticsEnabled(RegionCoprocessorEnvironment env) {
return env.getConfiguration().getBoolean(QueryServices.STATS_ENABLED_ATTRIB, true) &&
- !DISABLE_STATS.contains(env.getRegionInfo().getTable());
+ StatisticsUtil.isStatsEnabled(env.getRegionInfo().getTable());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index 5e03be5..db31b69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -44,14 +46,33 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Sets;
/**
* Simple utility class for managing multiple key parts of the statistic
*/
public class StatisticsUtil {
+
+ private static final Set<TableName> DISABLE_STATS = Sets.newHashSetWithExpectedSize(8);
+ // TODO: make this declarative through new DISABLE_STATS column on SYSTEM.CATALOG table.
+ // Also useful would be a USE_CURRENT_TIME_FOR_STATS column on SYSTEM.CATALOG table.
+ static {
+ DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+ DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME));
+ DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME));
+ DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME));
+ DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
+ DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES,true));
+ DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES,true));
+ DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,true));
+ }
+
private StatisticsUtil() {
// private ctor for utility classes
}
+
/** Number of parts in our complex key */
protected static final int NUM_KEY_PARTS = 3;
@@ -227,4 +248,10 @@ public class StatisticsUtil {
}
return ByteUtil.EMPTY_BYTE_ARRAY;
}
+
+ public static boolean isStatsEnabled(TableName tableName) {
+ return !DISABLE_STATS.contains(tableName);
+ }
+
+
}
\ No newline at end of file