You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2016/07/12 17:43:15 UTC

phoenix git commit: PHOENIX-3045 Data regions in transition forever if RS holding them down during drop index

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 45f0004e0 -> a67f74d58


PHOENIX-3045 Data regions in transition forever if RS holding them down during drop index


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a67f74d5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a67f74d5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a67f74d5

Branch: refs/heads/4.x-HBase-0.98
Commit: a67f74d58e80a19acbb81dc266bab1fffea9cfc3
Parents: 45f0004
Author: Ankit Singhal <an...@gmail.com>
Authored: Tue Jul 12 23:12:52 2016 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Tue Jul 12 23:12:52 2016 +0530

----------------------------------------------------------------------
 .../org/apache/phoenix/hbase/index/Indexer.java |  15 +--
 .../phoenix/hbase/index/write/IndexWriter.java  |  14 +-
 .../hbase/index/write/RecoveryIndexWriter.java  | 134 +++++++++++++++++++
 .../phoenix/iterate/BaseResultIterators.java    |   5 +-
 .../stats/StatisticsCollectorFactory.java       |  21 +--
 .../phoenix/schema/stats/StatisticsUtil.java    |  27 ++++
 6 files changed, 176 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 0aed2a6..2956470 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -63,13 +62,13 @@ import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
 import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
-import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
 import org.cloudera.htrace.TraceScope;
-
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
 import com.google.common.collect.Multimap;
 
 /**
@@ -152,11 +151,6 @@ public class Indexer extends BaseRegionObserver {
 
         // setup the actual index writer
         this.writer = new IndexWriter(env, serverName + "-index-writer");
-    
-        // setup the recovery writer that does retries on the failed edits
-        TrackingParallelWriterIndexCommitter recoveryCommmiter =
-            new TrackingParallelWriterIndexCommitter();
-    
         try {
           // get the specified failure policy. We only ever override it in tests, but we need to do it
           // here
@@ -165,10 +159,9 @@ public class Indexer extends BaseRegionObserver {
                 StoreFailuresInCachePolicy.class, IndexFailurePolicy.class);
           IndexFailurePolicy policy =
               policyClass.getConstructor(PerRegionIndexWriteCache.class).newInstance(failedIndexEdits);
-          LOG.debug("Setting up recovery writter with committer: " + recoveryCommmiter.getClass()
-              + " and failure policy: " + policy.getClass());
+          LOG.debug("Setting up recovery writter with failure policy: " + policy.getClass());
           recoveryWriter =
-              new IndexWriter(recoveryCommmiter, policy, env, serverName + "-recovery-writer");
+              new RecoveryIndexWriter(policy, env, serverName + "-recovery-writer");
         } catch (Exception ex) {
           throw new IOException("Could not instantiate recovery failure policy!", ex);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index cbcec3b..831aa16 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -30,13 +30,13 @@ import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
 import org.apache.phoenix.hbase.index.exception.IndexWriteException;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
 /**
  * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the
  * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed.
@@ -171,11 +171,11 @@ public class IndexWriter implements Stoppable {
     	write(resolveTableReferences(toWrite), false);
     }
 
-    public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IndexWriteException {
+    public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IOException {
     	write(resolveTableReferences(toWrite), allowLocalUpdates);
     }
-
-  /**
+    
+    /**
    * see {@link #write(Collection)}
    * @param toWrite
    * @throws IndexWriteException
@@ -190,7 +190,7 @@ public class IndexWriter implements Stoppable {
    * @param indexUpdates from the index builder
    * @return pairs that can then be written by an {@link IndexWriter}.
    */
-  public static Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
+  protected Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
       Collection<Pair<Mutation, byte[]>> indexUpdates) {
     Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap
         .<HTableInterfaceReference, Mutation> create();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
new file mode 100644
index 0000000..be542bb
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Used to recover failed index edits during WAL replay
+ * <p>
+ * We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon threads, so it will
+ * not block the region from shutting down.
+ */
+public class RecoveryIndexWriter extends IndexWriter {
+
+    private static final Log LOG = LogFactory.getLog(RecoveryIndexWriter.class);
+    private Set<HTableInterfaceReference> nonExistingTablesList = new HashSet<HTableInterfaceReference>();
+    private HBaseAdmin admin;
+
+    /**
+     * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected to be fully setup
+     * before calling.
+     * 
+     * @param committer
+     * @param policy
+     * @param env
+     * @throws IOException
+     * @throws ZooKeeperConnectionException
+     * @throws MasterNotRunningException
+     */
+    public RecoveryIndexWriter(IndexFailurePolicy policy, RegionCoprocessorEnvironment env, String name)
+            throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+        super(new TrackingParallelWriterIndexCommitter(), policy, env, name);
+        this.admin = new HBaseAdmin(env.getConfiguration());
+    }
+
+    @Override
+    public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IOException {
+        try {
+            write(resolveTableReferences(toWrite), allowLocalUpdates);
+        } catch (MultiIndexWriteFailureException e) {
+            for (HTableInterfaceReference table : e.getFailedTables()) {
+                if (!admin.tableExists(table.getTableName())) {
+                    LOG.warn("Failure due to non existing table: " + table.getTableName());
+                    nonExistingTablesList.add(table);
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    /**
+     * Convert the passed index updates to {@link HTableInterfaceReference}s.
+     * 
+     * @param indexUpdates
+     *            from the index builder
+     * @return pairs that can then be written by an {@link RecoveryIndexWriter}.
+     */
+    @Override
+    protected Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
+            Collection<Pair<Mutation, byte[]>> indexUpdates) {
+        Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap
+                .<HTableInterfaceReference, Mutation> create();
+
+        // simple map to make lookups easy while we build the map of tables to create
+        Map<ImmutableBytesPtr, HTableInterfaceReference> tables = new HashMap<ImmutableBytesPtr, HTableInterfaceReference>(
+                updates.size());
+        for (Pair<Mutation, byte[]> entry : indexUpdates) {
+            byte[] tableName = entry.getSecond();
+            ImmutableBytesPtr ptr = new ImmutableBytesPtr(tableName);
+            HTableInterfaceReference table = tables.get(ptr);
+            if (nonExistingTablesList.contains(table)) {
+                LOG.debug("Edits found for non existing table: " + table.getTableName() + " so skipping it!!");
+                continue;
+            }
+            if (table == null) {
+                table = new HTableInterfaceReference(ptr);
+                tables.put(ptr, table);
+            }
+            updates.put(table, entry.getFirst());
+
+        }
+        return updates;
+    }
+
+    @Override
+    public void stop(String why) {
+        super.stop(why);
+        if (admin != null) {
+            try {
+                admin.close();
+            } catch (IOException e) {
+                // closing silently
+            }
+        }
+    }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 4a797b8..c2a97b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -49,6 +49,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
@@ -80,11 +81,11 @@ import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.LogUtil;
@@ -360,7 +361,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         if (null == currentSCN) {
           currentSCN = HConstants.LATEST_TIMESTAMP;
         }
-        tableStats = useStats() && table.getType() != PTableType.SYSTEM
+        tableStats = useStats() && StatisticsUtil.isStatsEnabled(TableName.valueOf(physicalTableName))
                 ? context.getConnection().getQueryServices().getTableStats(physicalTableName, currentSCN)
                 : PTableStats.EMPTY_STATS;
         // Used to tie all the scans together during logging

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
index 30c560a..1c65f09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
@@ -18,15 +18,9 @@
 package org.apache.phoenix.schema.stats;
 
 import java.io.IOException;
-import java.util.Set;
 
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.SchemaUtil;
-
-import com.google.common.collect.Sets;
 
 /**
  * Provides new {@link DefaultStatisticsCollector} instances based on configuration settings for a
@@ -56,19 +50,6 @@ public class StatisticsCollectorFactory {
         }
     }
 
-    // TODO: make this declarative through new DISABLE_STATS column on SYSTEM.CATALOG table.
-    // Also useful would be a USE_CURRENT_TIME_FOR_STATS column on SYSTEM.CATALOG table.
-    private static final Set<TableName> DISABLE_STATS = Sets.newHashSetWithExpectedSize(3);
-    static {
-        DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
-        DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME));
-        DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME));
-        DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME));
-        DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
-        DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
-        DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
-        DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
-    }
     
     /**
      * Determines if statistics are enabled (which is the default). This is done on the
@@ -78,7 +59,7 @@ public class StatisticsCollectorFactory {
      */
     private static boolean statisticsEnabled(RegionCoprocessorEnvironment env) {
         return env.getConfiguration().getBoolean(QueryServices.STATS_ENABLED_ATTRIB, true) &&
-                !DISABLE_STATS.contains(env.getRegionInfo().getTable());
+                StatisticsUtil.isStatsEnabled(env.getRegionInfo().getTable());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a67f74d5/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index 5e03be5..db31b69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -44,14 +46,33 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Sets;
 
 /**
  * Simple utility class for managing multiple key parts of the statistic
  */
 public class StatisticsUtil {
+    
+    private static final Set<TableName> DISABLE_STATS = Sets.newHashSetWithExpectedSize(8);
+    // TODO: make this declarative through new DISABLE_STATS column on SYSTEM.CATALOG table.
+    // Also useful would be a USE_CURRENT_TIME_FOR_STATS column on SYSTEM.CATALOG table.
+    static {
+        DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+        DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME));
+        DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME));
+        DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME));
+        DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
+        DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES,true));
+        DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES,true));
+        DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,true));
+    }
+    
     private StatisticsUtil() {
         // private ctor for utility classes
     }
+    
 
     /** Number of parts in our complex key */
     protected static final int NUM_KEY_PARTS = 3;
@@ -227,4 +248,10 @@ public class StatisticsUtil {
 	    }
 	    return ByteUtil.EMPTY_BYTE_ARRAY;
 	}
+
+    public static boolean isStatsEnabled(TableName tableName) {
+        return !DISABLE_STATS.contains(tableName);
+    }
+	
+	
 }
\ No newline at end of file