You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/08/24 22:47:26 UTC
hbase git commit: HBASE-14224 Fix coprocessor handling of duplicate
classes
Repository: hbase
Updated Branches:
refs/heads/master 72f748f8b -> d0873f5a8
HBASE-14224 Fix coprocessor handling of duplicate classes
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d0873f5a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d0873f5a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d0873f5a
Branch: refs/heads/master
Commit: d0873f5a8cc060adbc5a1ae0ed52b84a8942a868
Parents: 72f748f
Author: stack <st...@apache.org>
Authored: Mon Aug 24 13:47:19 2015 -0700
Committer: stack <st...@apache.org>
Committed: Mon Aug 24 13:47:19 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/HTableDescriptor.java | 112 +++++++++++------
.../hadoop/hbase/TestHTableDescriptor.java | 43 +++++++
.../org/apache/hadoop/hbase/HConstants.java | 9 ++
.../hbase/coprocessor/CoprocessorHost.java | 14 +--
.../hbase/coprocessor/TestCoprocessorHost.java | 124 +++++++++++++++++++
hbase-shell/src/main/ruby/hbase/admin.rb | 18 +--
6 files changed, 256 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0873f5a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 938ab68..2c14dee 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -525,8 +525,7 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
* @param key The key.
* @param value The value.
*/
- public HTableDescriptor setValue(final Bytes key,
- final Bytes value) {
+ public HTableDescriptor setValue(final Bytes key, final Bytes value) {
if (key.compareTo(DEFERRED_LOG_FLUSH_KEY) == 0) {
boolean isDeferredFlush = Boolean.valueOf(Bytes.toString(value.get()));
LOG.warn("HTableDescriptor property:" + DEFERRED_LOG_FLUSH + " is deprecated, " +
@@ -534,6 +533,10 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
setDurability(isDeferredFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY);
return this;
}
+ Matcher matcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(key.get()));
+ if (matcher.matches()) {
+ LOG.warn("Use addCoprocessor* methods to add a coprocessor instead");
+ }
values.put(key, value);
return this;
}
@@ -1195,7 +1198,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
return this.families.remove(column);
}
-
/**
* Add a table coprocessor to this table. The coprocessor
* type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}
@@ -1211,7 +1213,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
return this;
}
-
/**
* Add a table coprocessor to this table. The coprocessor
* type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}
@@ -1229,10 +1230,9 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
public HTableDescriptor addCoprocessor(String className, Path jarFilePath,
int priority, final Map<String, String> kvs)
throws IOException {
- if (hasCoprocessor(className)) {
- throw new IOException("Coprocessor " + className + " already exists.");
- }
- // validate parameter kvs
+ checkHasCoprocessor(className);
+
+ // Validate parameter kvs and then add key/values to kvString.
StringBuilder kvString = new StringBuilder();
if (kvs != null) {
for (Map.Entry<String, String> e: kvs.entrySet()) {
@@ -1252,40 +1252,72 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
}
}
+ String value = ((jarFilePath == null)? "" : jarFilePath.toString()) +
+ "|" + className + "|" + Integer.toString(priority) + "|" +
+ kvString.toString();
+ return addCoprocessorToMap(value);
+ }
+
+ /**
+ * Add a table coprocessor to this table. The coprocessor
+ * type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}
+ * or Endpoint.
+ * It won't check if the class can be loaded or not.
+ * Whether a coprocessor is loadable or not will be determined when
+ * a region is opened.
+ * @param specStr The Coprocessor specification all in in one String formatted so matches
+ * {@link HConstants#CP_HTD_ATTR_VALUE_PATTERN}
+ * @throws IOException
+ */
+ // Pity about ugly method name. addCoprocessor(String) already taken above.
+ public HTableDescriptor addCoprocessorWithSpec(final String specStr) throws IOException {
+ String className = getCoprocessorClassNameFromSpecStr(specStr);
+ if (className == null) {
+ throw new IllegalArgumentException("Format does not match " +
+ HConstants.CP_HTD_ATTR_VALUE_PATTERN + ": " + specStr);
+ }
+ checkHasCoprocessor(className);
+ return addCoprocessorToMap(specStr);
+ }
+
+ private void checkHasCoprocessor(final String className) throws IOException {
+ if (hasCoprocessor(className)) {
+ throw new IOException("Coprocessor " + className + " already exists.");
+ }
+ }
+
+ /**
+ * Add coprocessor to values Map
+ * @param specStr The Coprocessor specification all in in one String formatted so matches
+ * {@link HConstants#CP_HTD_ATTR_VALUE_PATTERN}
+ * @return Returns <code>this</code>
+ */
+ private HTableDescriptor addCoprocessorToMap(final String specStr) {
+ if (specStr == null) return this;
// generate a coprocessor key
int maxCoprocessorNumber = 0;
Matcher keyMatcher;
- for (Map.Entry<Bytes, Bytes> e :
- this.values.entrySet()) {
- keyMatcher =
- HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(
- Bytes.toString(e.getKey().get()));
+ for (Map.Entry<Bytes, Bytes> e: this.values.entrySet()) {
+ keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
if (!keyMatcher.matches()) {
continue;
}
- maxCoprocessorNumber = Math.max(Integer.parseInt(keyMatcher.group(1)),
- maxCoprocessorNumber);
+ maxCoprocessorNumber = Math.max(Integer.parseInt(keyMatcher.group(1)), maxCoprocessorNumber);
}
maxCoprocessorNumber++;
-
String key = "coprocessor$" + Integer.toString(maxCoprocessorNumber);
- String value = ((jarFilePath == null)? "" : jarFilePath.toString()) +
- "|" + className + "|" + Integer.toString(priority) + "|" +
- kvString.toString();
- setValue(key, value);
+ this.values.put(new Bytes(Bytes.toBytes(key)), new Bytes(Bytes.toBytes(specStr)));
return this;
}
-
/**
* Check if the table has an attached co-processor represented by the name className
*
- * @param className - Class name of the co-processor
+ * @param classNameToMatch - Class name of the co-processor
* @return true of the table has a co-processor className
*/
- public boolean hasCoprocessor(String className) {
+ public boolean hasCoprocessor(String classNameToMatch) {
Matcher keyMatcher;
- Matcher valueMatcher;
for (Map.Entry<Bytes, Bytes> e :
this.values.entrySet()) {
keyMatcher =
@@ -1294,15 +1326,9 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
if (!keyMatcher.matches()) {
continue;
}
- valueMatcher =
- HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(
- Bytes.toString(e.getValue().get()));
- if (!valueMatcher.matches()) {
- continue;
- }
- // get className and compare
- String clazz = valueMatcher.group(2).trim(); // classname is the 2nd field
- if (clazz.equals(className.trim())) {
+ String className = getCoprocessorClassNameFromSpecStr(Bytes.toString(e.getValue().get()));
+ if (className == null) continue;
+ if (className.equals(classNameToMatch.trim())) {
return true;
}
}
@@ -1317,23 +1343,29 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
public List<String> getCoprocessors() {
List<String> result = new ArrayList<String>();
Matcher keyMatcher;
- Matcher valueMatcher;
for (Map.Entry<Bytes, Bytes> e : this.values.entrySet()) {
keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
if (!keyMatcher.matches()) {
continue;
}
- valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes
- .toString(e.getValue().get()));
- if (!valueMatcher.matches()) {
- continue;
- }
- result.add(valueMatcher.group(2).trim()); // classname is the 2nd field
+ String className = getCoprocessorClassNameFromSpecStr(Bytes.toString(e.getValue().get()));
+ if (className == null) continue;
+ result.add(className); // classname is the 2nd field
}
return result;
}
/**
+ * @param spec String formatted as per {@link HConstants#CP_HTD_ATTR_VALUE_PATTERN}
+ * @return Class parsed from passed in <code>spec</code> or null if no match or classpath found
+ */
+ private static String getCoprocessorClassNameFromSpecStr(final String spec) {
+ Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
+ // Classname is the 2nd field
+ return matcher != null && matcher.matches()? matcher.group(2).trim(): null;
+ }
+
+ /**
* Remove a coprocessor from those set on the table
* @param className Class name of the co-processor
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0873f5a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
index 0f37064..680f2c1 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
@@ -43,6 +43,49 @@ import org.junit.experimental.categories.Category;
public class TestHTableDescriptor {
private static final Log LOG = LogFactory.getLog(TestHTableDescriptor.class);
+ @Test (expected=IOException.class)
+ public void testAddCoprocessorTwice() throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
+ String cpName = "a.b.c.d";
+ htd.addCoprocessor(cpName);
+ htd.addCoprocessor(cpName);
+ }
+
+ @Test
+ public void testAddCoprocessorWithSpecStr() throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
+ String cpName = "a.b.c.d";
+ boolean expected = false;
+ try {
+ htd.addCoprocessorWithSpec(cpName);
+ } catch (IllegalArgumentException iae) {
+ expected = true;
+ }
+ if (!expected) fail();
+ // Try minimal spec.
+ try {
+ htd.addCoprocessorWithSpec("file:///some/path" + "|" + cpName);
+ } catch (IllegalArgumentException iae) {
+ expected = false;
+ }
+ if (expected) fail();
+ // Try more spec.
+ String spec = "hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2";
+ try {
+ htd.addCoprocessorWithSpec(spec);
+ } catch (IllegalArgumentException iae) {
+ expected = false;
+ }
+ if (expected) fail();
+ // Try double add of same coprocessor
+ try {
+ htd.addCoprocessorWithSpec(spec);
+ } catch (IOException ioe) {
+ expected = true;
+ }
+ if (!expected) fail();
+ }
+
@Test
public void testPb() throws DeserializationException, IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0873f5a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index c0d6740..d2230b9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -881,6 +881,15 @@ public final class HConstants {
public static final Pattern CP_HTD_ATTR_KEY_PATTERN =
Pattern.compile("^coprocessor\\$([0-9]+)$", Pattern.CASE_INSENSITIVE);
+
+ /**
+ * Pattern that matches a coprocessor specification. Form is:
+ * <code>
+ *<coprocessor jar file location> '|' <<class name> ['|' <priority> ['|' <arguments>]]
+ * </code>
+ * ...where arguments are <code><KEY> '=' <VALUE> [,...]</code>
+ * <p>For example: <code>hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2</code>
+ */
public static final Pattern CP_HTD_ATTR_VALUE_PATTERN =
Pattern.compile("(^[^\\|]*)\\|([^\\|]+)\\|[\\s]*([\\d]*)[\\s]*(\\|.*)?$");
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0873f5a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index b047d33..61e76fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -117,14 +117,14 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
*/
public Set<String> getCoprocessors() {
Set<String> returnValue = new TreeSet<String>();
- for(CoprocessorEnvironment e: coprocessors) {
+ for (CoprocessorEnvironment e: coprocessors) {
returnValue.add(e.getInstance().getClass().getSimpleName());
}
return returnValue;
}
/**
- * Load system coprocessors. Read the class names from configuration.
+ * Load system coprocessors once only. Read the class names from configuration.
* Called by constructor.
*/
protected void loadSystemCoprocessors(Configuration conf, String confKey) {
@@ -142,17 +142,20 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
return;
int priority = Coprocessor.PRIORITY_SYSTEM;
- List<E> configured = new ArrayList<E>();
for (String className : defaultCPClasses) {
className = className.trim();
if (findCoprocessor(className) != null) {
+ // If already loaded will just continue
+ LOG.warn("Attempted duplicate loading of " + className + "; skipped");
continue;
}
ClassLoader cl = this.getClass().getClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
implClass = cl.loadClass(className);
- configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
+ // Add coprocessors as we go to guard against case where a coprocessor is specified twice
+ // in the configuration
+ this.coprocessors.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
LOG.info("System coprocessor " + className + " was loaded " +
"successfully with priority (" + priority++ + ").");
} catch (Throwable t) {
@@ -160,9 +163,6 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
abortServer(className, t);
}
}
-
- // add entire set to the collection for COW efficiency
- coprocessors.addAll(configured);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0873f5a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorHost.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorHost.java
new file mode 100644
index 0000000..0ec7864
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorHost.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class})
+public class TestCoprocessorHost {
+ /**
+ * An {@link Abortable} implementation for tests.
+ */
+ class TestAbortable implements Abortable {
+ private volatile boolean aborted = false;
+
+ @Override
+ public void abort(String why, Throwable e) {
+ this.aborted = true;
+ Assert.fail();
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+ }
+
+ @Test
+ public void testDoubleLoading() {
+ final Configuration conf = HBaseConfiguration.create();
+ CoprocessorHost<CoprocessorEnvironment> host =
+ new CoprocessorHost<CoprocessorEnvironment>(new TestAbortable()) {
+ final Configuration cpHostConf = conf;
+
+ @Override
+ public CoprocessorEnvironment createEnvironment(Class<?> implClass,
+ final Coprocessor instance, int priority, int sequence, Configuration conf) {
+ return new CoprocessorEnvironment() {
+ final Coprocessor envInstance = instance;
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public String getHBaseVersion() {
+ return "0.0.0";
+ }
+
+ @Override
+ public Coprocessor getInstance() {
+ return envInstance;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public int getLoadSequence() {
+ return 0;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return cpHostConf;
+ }
+
+ @Override
+ public Table getTable(TableName tableName) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Table getTable(TableName tableName, ExecutorService service) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return null;
+ }
+ };
+ }
+ };
+ final String key = "KEY";
+ final String coprocessor = "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver";
+ // Try and load coprocessor three times.
+ conf.setStrings(key, coprocessor, coprocessor, coprocessor);
+ host.loadSystemCoprocessors(conf, key);
+ // Only one coprocessor loaded
+ Assert.assertEquals(1, host.coprocessors.size());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0873f5a/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index 3afc807..48234f3 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -582,25 +582,9 @@ module Hbase
k.strip!
if (k =~ /coprocessor/i)
- # validate coprocessor specs
v = String.new(value)
v.strip!
- if !(v =~ /^([^\|]*)\|([^\|]+)\|[\s]*([\d]*)[\s]*(\|.*)?$/)
- raise ArgumentError, "Coprocessor value doesn't match spec: #{v}"
- end
-
- # generate a coprocessor ordinal by checking max id of existing cps
- maxId = 0
- htd.getValues().each do |k1, v1|
- attrName = org.apache.hadoop.hbase.util.Bytes.toString(k1.get())
- # a cp key is coprocessor$(\d)
- if (attrName =~ /coprocessor\$(\d+)/i)
- ids = attrName.scan(/coprocessor\$(\d+)/i)
- maxId = ids[0][0].to_i if ids[0][0].to_i > maxId
- end
- end
- maxId += 1
- htd.setValue(k + "\$" + maxId.to_s, value)
+ htd.addCoprocessor(v)
valid_coproc_keys << key
end
end