You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2014/08/01 00:08:04 UTC
[28/50] [abbrv] git commit: [master] Change configuration of
observers also to support project@version format
[master] Change configuration of observers also to support project@version format
Summary:
After this diff each entry is in the following format:
project@version->table[,table]
or
className->table[,table]
Test Plan:
`TestClassLoading`
`TestEndpointReload`
Reviewers: manukranthk, adela
Reviewed By: adela
Subscribers: hbase-eng@
Differential Revision: https://phabricator.fb.com/D1352561
Tasks: 4414918
git-svn-id: svn+ssh://tubbs/svnhive/hadoop/branches/titan/VENDOR.hbase/hbase-trunk@43112 e7acf4d4-3532-417f-9e73-7a9ae25a1f51
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a45454db
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a45454db
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a45454db
Branch: refs/heads/0.89-fb
Commit: a45454db37e1a663a42674a083d69911cfc7aee2
Parents: f0eccb5
Author: daviddeng <da...@e7acf4d4-3532-417f-9e73-7a9ae25a1f51>
Authored: Tue Jun 24 19:57:37 2014 +0000
Committer: Elliott Clark <el...@fb.com>
Committed: Thu Jul 31 14:44:23 2014 -0700
----------------------------------------------------------------------
.../hbase/coprocessor/CoprocessorHost.java | 199 +++++++++++++++----
.../coprocessor/RegionCoprocessorHost.java | 6 +-
.../coprocessor/endpoints/EndpointLoader.java | 101 ++--------
.../endpoints/TestEndpointReload.java | 4 +-
.../TestHRegionObserverBypassCoprocessor.java | 6 +-
.../util/coprocessor/TestClassLoading.java | 91 +++++----
6 files changed, 237 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a45454db/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index 6e98f57..b04a57a 100644
--- a/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -31,17 +31,22 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.util.coprocessor.CoprocessorClassLoader;
@@ -89,6 +94,8 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
// unique file prefix to use for local copies of jars when classloading
protected String pathPrefix;
protected AtomicInteger loadSequence = new AtomicInteger();
+ public static final Pattern PAT_PROJECT_VERSION =
+ Pattern.compile("[^@]+@[0-9]+");
/**
* The field name for "loaded classes"
*/
@@ -204,14 +211,15 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
/**
* Read coprocessors that should be loaded from configuration. In the
- * configuration we should have string of tuples of [table, jar, class] for
- * each usage of coprocesssor
+ * configuration we should have string of tuples of
+ * "project@version->table[,table]" or "classname->table[,table]"
+ * for each usage of coprocessor
*
* @param conf
*/
- public Map<String, List<Pair<String, String>>> readCoprocessorsFromConf(
+ public Map<String, List<String>> readCoprocessorsFromConf(
Configuration conf) {
- Map<String, List<Pair<String, String>>> coprocConfigMap = new HashMap<>();
+ Map<String, List<String>> coprocConfigMap = new HashMap<>();
String fromConfig = conf.get(USER_REGION_COPROCESSOR_FROM_HDFS_KEY);
if (fromConfig == null || fromConfig.isEmpty()) {
return coprocConfigMap;
@@ -219,27 +227,29 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
String[] tuples = fromConfig.split(";");
for (String tuple : tuples) {
tuple = tuple.trim();
- String[] str = tuple.split(",");
- // str[0] is table, str[1] is path for the jar, str[2] is name of the
- // class
- if (str.length != 3) {
- LOG.error("Configuration is not in expected format: " + str);
- return null;
+ String[] pathAndTables = tuple.split("->", 2);
+ if (pathAndTables.length != 2) {
+ LOG.error("Configuration is not in expected format: " + tuple);
+ // Ignore this tuple
+ continue;
}
- String table = str[0].trim();
- List<Pair<String, String>> inMap = coprocConfigMap.get(table);
- if (inMap == null) {
- inMap = new ArrayList<>();
- coprocConfigMap.put(table, inMap);
+ String path = pathAndTables[0].trim();
+ String[] tables = pathAndTables[1].trim().split(",");
+ for (String table : tables) {
+ List<String> inMap = coprocConfigMap.get(table);
+ if (inMap == null) {
+ inMap = new ArrayList<>();
+ coprocConfigMap.put(table, inMap);
+ }
+ inMap.add(path);
}
- inMap.add(new Pair<>(str[1].trim(), str[2].trim()));
}
return coprocConfigMap;
}
/**
* Checks if the specified path for the coprocessor jar is in expected format
- * TODO: make this more advanced - currently is very hardoced and dummy
+ * TODO: make this more advanced - currently is very hardcoded and dummy
*
* @param path
* - Absolute and complete path where the coprocessor jar resides
@@ -261,25 +271,23 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
}
/**
- * Used to load coprocessors whose jar is specified via hdfs path. All
+ * Used to load coprocessors whose jar is specified via HDFS path. All
* existing coprocessors will be unloaded (if they appear again in the new
* configuration then they will be re-added).
*
* @param config
* @throws IOException
*/
- protected void reloadCoprocessorsFromHdfs(String table,
- List<Pair<String, String>> toLoad, Configuration config)
- throws IOException {
+ protected void reloadCoprocessorsFromHdfs(String table, List<String> toLoad,
+ Configuration config) throws IOException {
// remove whatever is loaded first
removeLoadedCoprocWithKey(USER_REGION_COPROCESSOR_FROM_HDFS_KEY);
List<E> newCoprocessors = new ArrayList<>();
LOG.info("Loading coprocessor for table: " + table);
- for (Pair<String, String> pair : toLoad) {
- LOG.info("Jar: " + pair.getFirst() + " class: " + pair.getSecond());
- E coproc = load(new Path(pair.getFirst()), pair.getSecond(),
- Coprocessor.PRIORITY_USER, config,
+ for (String path : toLoad) {
+ LOG.info("Loading from " + path);
+ E coproc = load(path, Coprocessor.PRIORITY_USER, config,
USER_REGION_COPROCESSOR_FROM_HDFS_KEY);
newCoprocessors.add(coproc);
}
@@ -300,38 +308,48 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
* @param conf configuration for coprocessor
* @throws java.io.IOException Exception
*/
- public E load(Path path, String className, int priority,
- Configuration conf, String confKey) throws IOException {
+ public E load(String path, int priority, Configuration conf, String confKey)
+ throws IOException {
Class<?> implClass = null;
- LOG.debug("Loading coprocessor class " + className + " with path " +
- path + " and priority " + priority);
-
ClassLoader cl = null;
- if (path == null) {
- try {
- implClass = getClass().getClassLoader().loadClass(className);
- } catch (ClassNotFoundException e) {
- throw new IOException("No jar path specified for " + className);
+ if (PAT_PROJECT_VERSION.matcher(path).matches()) {
+ String[] projVer = path.split("@");
+ String project = projVer[0];
+ int version = Integer.parseInt(projVer[1]);
+
+ FileSystem fs = FileSystem.get(conf);
+ String dfsRoot = getCoprocessorDfsRoot(conf);
+
+ Pair<List<Class<?>>, ClassLoader> classesAndLoader =
+ getProjectClassesAndLoader(conf, fs, dfsRoot, project, version,
+ pathPrefix);
+ if (classesAndLoader.getFirst().size() != 1) {
+ // The number of loaded classes should be one for observers.
+ throw new IOException("The number of loaded classes for observers "
+ + "should be one but we got: " + classesAndLoader.getFirst());
}
+ implClass = classesAndLoader.getFirst().get(0);
+ cl = classesAndLoader.getSecond();
} else {
- cl = CoprocessorClassLoader.getClassLoader(
- path, getClass().getClassLoader(), pathPrefix, conf);
+ String className = path;
+ LOG.debug("Loading coprocessor class " + className + " with path " + path
+ + " and priority " + priority);
try {
+ cl = this.getClass().getClassLoader();
implClass = cl.loadClass(className);
} catch (ClassNotFoundException e) {
- throw new IOException("Cannot load external coprocessor class " + className, e);
+ throw new IOException("Cannot load coprocessor class " + className, e);
}
}
+
//load custom code for coprocessor
try (ContextResetter ctxResetter = new ContextResetter(cl)) {
-// switch temporarily to the thread classloader for custom CP
+ // switch temporarily to the thread classloader for custom CP
E cpInstance = loadInstance(implClass, priority, conf, confKey);
return cpInstance;
} catch (Exception e) {
- String msg = new StringBuilder()
- .append("Cannot load external coprocessor class ").append(className)
- .toString();
- throw new IOException(msg, e);
+ throw new IOException("Cannot load external coprocessor class "
+ + implClass.getName(), e);
}
}
@@ -634,6 +652,101 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
}
}
+ public static ClassLoader getDfsClassLoader(Configuration conf,
+ FileSystem fs, Path projectPath, String localPathPrefx)
+ throws IOException {
+ FileStatus[] jars = fs.listStatus(projectPath, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().endsWith(".jar");
+ }
+ });
+
+ if (jars.length < 1) {
+ throw new IOException("No jars in " + projectPath + " found!");
+ }
+
+ // TODO load class based on all jars.
+ return CoprocessorClassLoader.getClassLoader(jars[0].getPath(),
+ CoprocessorHost.class.getClassLoader(), localPathPrefx, conf);
+ }
+
+ /**
+ * Returns an entry string for a version of a project.
+ */
+ public static String genDfsProjectEntry(String project, int version) {
+ return project + "@" + version;
+ }
+
+ /**
+ * Returns an entry string for a version of a project for a table.
+ */
+ public static String genDfsPerTableProjectEntry(String project, int version,
+ String tableName) {
+ return project + "@" + version + "->" + tableName;
+ }
+
+ /**
+ * Returns the list of classes and ClassLoader to load them.
+ *
+ * @param conf The configuration
+ * @param fs The file-system.
+ * @param dfsRootPath The root path of coprocessors on DFS.
+ * @param project The name of the project.
+ * @param version The version of the project to load
+ * @param localPathPrefx The prefix of paths for CoprocessorClassLoader to
+ * copy jars to local.
+ * @return A Pair of classes to load and the ClassLoader.
+ */
+ @SuppressWarnings({ "unchecked" })
+ public static Pair<List<Class<?>>, ClassLoader> getProjectClassesAndLoader(
+ Configuration conf, FileSystem fs,
+ String dfsRootPath, String project, int version, String localPathPrefx)
+ throws IOException {
+ Path projectPath =
+ new Path(getCoprocessorPath(dfsRootPath, project, version));
+ if (!fs.exists(projectPath)) {
+ throw new IOException("Folder " + projectPath + " doesn't exist!");
+ }
+ if (!fs.getFileStatus(projectPath).isDir()) {
+ throw new IOException(projectPath + " is not a folder!");
+ }
+
+ Path configPath = new Path(projectPath, CONFIG_JSON);
+ if (!fs.exists(configPath)) {
+ throw new IOException("Cannot find config file: " + configPath);
+ }
+
+ Map<String, Object> jConf = FSUtils.readJSONFromFile(fs, configPath);
+ String name = (String) jConf.get(COPROCESSOR_JSON_NAME_FIELD);
+ if (!name.equals(project)) {
+ throw new IOException("Loaded name " + name + " is not expected"
+ + project);
+ }
+
+ int loadedVer = (int) jConf.get(COPROCESSOR_JSON_VERSION_FIELD);
+ if (loadedVer != version) {
+ throw new IOException("Loaded version " + loadedVer + " is not expected"
+ + version);
+ }
+
+ ArrayList<String> loadedClasses =
+ (ArrayList<String>) jConf.get(COPROCESSOR_JSON_LOADED_CLASSES_FIELD);
+
+ ClassLoader cl = getDfsClassLoader(conf, fs, projectPath, localPathPrefx);
+ List<Class<?>> res = new ArrayList<>(loadedClasses.size());
+ for (int i = 0; i < loadedClasses.size(); i++) {
+ try {
+ Class<?> cls = cl.loadClass(loadedClasses.get(i));
+ res.add(cls);
+ } catch (Throwable t) {
+ LOG.error("Load class " + loadedClasses.get(i) + " failed, ignored!", t);
+ }
+ }
+
+ return Pair.newPair(res, cl);
+ }
+
/**
* Used just to set the contextClassLoader on the current thread in case of
* exception - code in {@link #close()} will be executed
http://git-wip-us.apache.org/repos/asf/hbase/blob/a45454db/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
index f39a765..f000099 100644
--- a/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
+++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.coprocessor.observers.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.observers.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Pair;
/**
* Implements the coprocessor environment and runtime support for coprocessors
@@ -80,10 +79,10 @@ public class RegionCoprocessorHost extends
* @throws IOException
*/
public void reloadCoprocessors(Configuration newConf) throws IOException {
- Map<String, List<Pair<String, String>>> map = this
+ Map<String, List<String>> map = this
.readCoprocessorsFromConf(newConf);
String coprocTable = this.region.getTableDesc().getNameAsString();
- List<Pair<String, String>> forTable = map.get(coprocTable);
+ List<String> forTable = map.get(coprocTable);
// reload system default cp's from configuration.
reloadSysCoprocessorsOnConfigChange(newConf, REGION_COPROCESSOR_CONF_KEY);
// reload system default cp's for user tables from configuration.
@@ -216,6 +215,7 @@ public class RegionCoprocessorHost extends
}
+ @Override
public void shutdown() {
super.shutdown();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a45454db/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLoader.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLoader.java
index 794f91b..55c87e0 100644
--- a/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLoader.java
+++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLoader.java
@@ -22,20 +22,14 @@ package org.apache.hadoop.hbase.coprocessor.endpoints;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
-import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.coprocessor.CoprocessorClassLoader;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Static methods for loading endpoints.
@@ -55,86 +49,29 @@ public class EndpointLoader {
private static final String dfsTmpPrefix = "endpoint."
+ UUID.randomUUID().toString();
- private static final Pattern PAT_PROJECT_VERSION =
- Pattern.compile("[^@]+@[0-9]+");
-
- /**
- * Returns an entry string for a version of a project.
- */
- public static String genDfsEndpointEntry(String project, int version) {
- return project + "@" + version;
- }
-
- private static ClassLoader getDfsClassLoader(Configuration conf,
- FileSystem fs, Path projectPath) throws IOException {
- FileStatus[] jars = fs.listStatus(projectPath, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().endsWith(".jar");
- }
- });
-
- if (jars.length < 1) {
- throw new IOException("No jars in " + projectPath + " found!");
- }
-
- // TODO load class based on all jars.
- return CoprocessorClassLoader.getClassLoader(jars[0].getPath(),
- EndpointLoader.class.getClassLoader(), dfsTmpPrefix, conf);
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private static List<IEndpointFactory> loadProject(Configuration conf,
+ @SuppressWarnings({ "rawtypes" })
+ private static List<IEndpointFactory> loadEndpointProject(Configuration conf,
FileSystem fs, String dfsRootPath, String project, int version)
throws IOException {
- Path projectPath = new Path(CoprocessorHost.getCoprocessorPath(dfsRootPath,
- project, version));
- if (!fs.exists(projectPath)) {
- throw new IOException("Folder " + projectPath + " doesn't exist!");
- }
- if (!fs.getFileStatus(projectPath).isDir()) {
- throw new IOException(projectPath + " is not a folder!");
- }
-
- Path configPath = new Path(projectPath, CoprocessorHost.CONFIG_JSON);
- if (!fs.exists(configPath)) {
- throw new IOException("Cannot find config file: " + configPath);
- }
-
- Map<String, Object> jConf = FSUtils.readJSONFromFile(fs, configPath);
- String name = (String) jConf.get(
- CoprocessorHost.COPROCESSOR_JSON_NAME_FIELD);
- if (!name.equals(project)) {
- throw new IOException("Loaded name " + name + " is not expected"
- + project);
- }
- int loadedVer =
- (int) jConf.get(CoprocessorHost.COPROCESSOR_JSON_VERSION_FIELD);
- if (loadedVer != version) {
- throw new IOException("Loaded version " + loadedVer + " is not expected"
- + version);
- }
-
- ArrayList<String> loadedClasses = (ArrayList<String>) jConf.get(
- CoprocessorHost.COPROCESSOR_JSON_LOADED_CLASSES_FIELD);
-
- ClassLoader cl = getDfsClassLoader(conf, fs, projectPath);
- List<IEndpointFactory> res = new ArrayList<>(loadedClasses.size());
- for (int i = 0; i < loadedClasses.size(); i++) {
+ Pair<List<Class<?>>, ClassLoader> classesAndLoader =
+ CoprocessorHost.getProjectClassesAndLoader(conf, fs, dfsRootPath, project, version,
+ dfsTmpPrefix);
+
+ List<IEndpointFactory> res = new ArrayList<>(
+ classesAndLoader.getFirst().size());
+ for (Class<?> cls: classesAndLoader.getFirst()) {
+ if (!IEndpointFactory.class.isAssignableFrom(cls)) {
+ LOG.info("Class " + cls + " cannot be assigned as "
+ + IEndpointFactory.class + ", ignored!");
+ continue;
+ }
try {
- Class<?> cls = cl.loadClass(loadedClasses.get(i));
- if (!IEndpointFactory.class.isAssignableFrom(cls)) {
- LOG.info("Class " + cls + " cannot be assigned as "
- + IEndpointFactory.class + ", ignored!");
- continue;
- }
res.add((IEndpointFactory) cls.newInstance());
} catch (Throwable t) {
- LOG.error("Load class " + loadedClasses.get(i) + " failed!", t);
+ LOG.info("Creating instance of " + cls + " failed!", t);
}
}
-
return res;
}
@@ -154,7 +91,7 @@ public class EndpointLoader {
for (String className : classNames) {
try {
- if (PAT_PROJECT_VERSION.matcher(className).matches()) {
+ if (CoprocessorHost.PAT_PROJECT_VERSION.matcher(className).matches()) {
String[] projVer = className.split("@");
String project = projVer[0];
int version = Integer.parseInt(projVer[1]);
@@ -162,8 +99,8 @@ public class EndpointLoader {
if (fs == null) {
fs = FileSystem.get(conf);
}
- List<IEndpointFactory> factories = loadProject(conf, fs, dfsRoot,
- project, version);
+ List<IEndpointFactory> factories = loadEndpointProject(conf, fs,
+ dfsRoot, project, version);
for (IEndpointFactory factory : factories) {
LOG.info("Loading endpoint class " + factory.getClass().getName());
reg.register(factory.getEndpointInterface(), factory);
http://git-wip-us.apache.org/repos/asf/hbase/blob/a45454db/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointReload.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointReload.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointReload.java
index 872758b..58310d0 100644
--- a/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointReload.java
+++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointReload.java
@@ -244,14 +244,14 @@ public class TestEndpointReload {
// two
conf.setStrings(EndpointLoader.FACTORY_CLASSES_KEY,
- EndpointLoader.genDfsEndpointEntry(TWO_NAME, TWO_VERSION));
+ CoprocessorHost.genDfsProjectEntry(TWO_NAME, TWO_VERSION));
HRegionServer.configurationManager.notifyAllObservers(conf);
checkEndpionts(cp, jarFile, false, true);
// one+two
conf.setStrings(EndpointLoader.FACTORY_CLASSES_KEY,
OneFactory.class.getName(),
- EndpointLoader.genDfsEndpointEntry(TWO_NAME, TWO_VERSION));
+ CoprocessorHost.genDfsProjectEntry(TWO_NAME, TWO_VERSION));
HRegionServer.configurationManager.notifyAllObservers(conf);
checkEndpionts(cp, jarFile, true, true);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a45454db/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
index 08a2e20..ce230ae 100644
--- a/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
+++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
@@ -134,9 +134,9 @@ public class TestHRegionObserverBypassCoprocessor {
HTable table = new HTable(util.getConfiguration(), tableName);
Set<String> allCoprocessors = RegionCoprocessorHost
.getEverLoadedCoprocessors();
- Assert.assertEquals("There should be only one coprocessor everloaded", 1, allCoprocessors.size());
- Assert
- .assertEquals(
+ Assert.assertEquals("There should be only one coprocessor everloaded", 1,
+ allCoprocessors.size());
+ Assert.assertEquals(
"Expected loaded coprocessor is different from one which is currently loaded",
TestCoprocessor.class.getName(), allCoprocessors.toArray()[0]);
Configuration conf = util.getConfiguration();
http://git-wip-us.apache.org/repos/asf/hbase/blob/a45454db/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java b/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java
index b787b60..569c663 100644
--- a/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java
+++ b/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java
@@ -19,27 +19,28 @@
package org.apache.hadoop.hbase.util.coprocessor;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.OutputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorAdmin;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.observers.TestHRegionObserverBypassCoprocessor.TestCoprocessor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -49,13 +50,10 @@ import org.junit.Test;
* Test coprocessors class loading.
*/
public class TestClassLoading {
- private static final Log LOG = LogFactory.getLog(TestClassLoading.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] DUMMY = Bytes.toBytes("dummy");
private static final byte[] TEST = Bytes.toBytes("test");
- private static MiniDFSCluster cluster;
-
static final String tableName = "TestClassLoading";
static final String cpName1 = "TestCP1";
static final String cpName2 = "TestCP2";
@@ -69,7 +67,6 @@ public class TestClassLoading {
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
testCoprocessor.getName());
TEST_UTIL.startMiniCluster(1);
- cluster = TEST_UTIL.getDFSCluster();
}
@AfterClass
@@ -104,38 +101,58 @@ public class TestClassLoading {
return newStr;
}
+ private void uploadCoprocessor(File jarFile, String name, int version,
+ String className) throws Exception {
+ Path localPath =
+ TEST_UTIL.getTestDir("TestClassLoading-uploadCoprocessor-" + name
+ + "-" + version);
+
+ LocalFileSystem lfs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
+ lfs.delete(localPath, true);
+
+ lfs.mkdirs(localPath);
+ lfs.copyFromLocalFile(false, new Path(jarFile.toString()), new Path(
+ localPath, jarFile.getName()));
+
+ JsonFactory f = new JsonFactory();
+ try (OutputStream out =
+ lfs.create(new Path(localPath, CoprocessorHost.CONFIG_JSON))) {
+ JsonGenerator g = f.createJsonGenerator(out);
+ g.writeStartObject();
+
+ g.writeStringField(CoprocessorHost.COPROCESSOR_JSON_NAME_FIELD, name);
+
+ g.writeNumberField(CoprocessorHost.COPROCESSOR_JSON_VERSION_FIELD,
+ version);
+
+ g.writeArrayFieldStart(CoprocessorHost.COPROCESSOR_JSON_LOADED_CLASSES_FIELD);
+ g.writeString(className);
+ g.writeEndArray();
+ g.writeEndObject();
+ g.close();
+ }
+
+ new CoprocessorAdmin(TEST_UTIL.getConfiguration()).upload(localPath, true);
+ }
+
@Test
public void testClassLoadingFromHDFS() throws Exception {
- FileSystem fs = cluster.getFileSystem();
+ final String CP_NAME = "test";
+
+ // Create jar files.
File jarFile1 = buildCoprocessorJar(cpName1);
System.out.println(jarFile1.getName());
File jarFile2 = buildCoprocessorJar(cpName2);
- // have to create directories because we are not placing the files on hdfs
- // root
- assertTrue(fs.mkdirs(new Path("/coprocessors/test/1")));
- // copy the jars into dfs
- fs.moveFromLocalFile(new Path(jarFile1.getPath()), new Path(fs.getUri()
- .toString(), buildCorrectPathForCoprocessorJar("") + Path.SEPARATOR));
-
- String jarFileOnHDFS1 = buildCorrectPathForCoprocessorJar(fs.getUri()
- .toString()) + Path.SEPARATOR + jarFile1.getName();
- Path pathOnHDFS1 = new Path(jarFileOnHDFS1);
- System.out.println(jarFileOnHDFS1);
- assertTrue("Copy jar file to HDFS failed.", fs.exists(pathOnHDFS1));
- LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1);
-
- fs.moveFromLocalFile(new Path(jarFile2.getPath()), new Path(fs.getUri()
- .toString(), buildCorrectPathForCoprocessorJar("") + Path.SEPARATOR));
-
- String jarFileOnHDFS2 = buildCorrectPathForCoprocessorJar(fs.getUri()
- .toString()) + Path.SEPARATOR + jarFile2.getName();
- Path pathOnHDFS2 = new Path(jarFileOnHDFS2);
- assertTrue("Copy jar file to HDFS failed.", fs.exists(pathOnHDFS2));
- LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS2);
+ System.out.println(jarFile2.getName());
+ // Upload jars to dfs
+ uploadCoprocessor(jarFile1, CP_NAME, 1, cpName1);
+ uploadCoprocessor(jarFile2, CP_NAME, 2, cpName2);
+
Configuration conf = TEST_UTIL.getConfiguration();
// check if only TestCoprocessor is currently loaded
- List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegions(Bytes.toBytes(tableName));
+ List<HRegion> regions =
+ TEST_UTIL.getMiniHBaseCluster().getRegions(Bytes.toBytes(tableName));
Set<String> expectedCoprocessorSimpleName = new HashSet<>();
Set<String> allCoprocessors = RegionCoprocessorHost
.getEverLoadedCoprocessors();
@@ -148,7 +165,7 @@ public class TestClassLoading {
// remove the firstly added coprocessor
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, "");
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_FROM_HDFS_KEY,
- tableName + "," + pathOnHDFS1.toString() + "," + cpName1);
+ CoprocessorHost.genDfsPerTableProjectEntry(CP_NAME, 1, tableName));
// invoke online configuration change
HRegionServer.configurationManager.notifyAllObservers(conf);
@@ -169,8 +186,8 @@ public class TestClassLoading {
//now load the second coprocessor too
String current = conf.get(CoprocessorHost.USER_REGION_COPROCESSOR_FROM_HDFS_KEY);
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_FROM_HDFS_KEY,
- current + ";" + tableName + "," + pathOnHDFS2.toString() + ","
- + cpName2);
+ current + ";"
+ + CoprocessorHost.genDfsPerTableProjectEntry(CP_NAME, 2, tableName));
// invoke online config change
HRegionServer.configurationManager.notifyAllObservers(conf);
allCoprocessors = RegionCoprocessorHost.getEverLoadedCoprocessors();
@@ -179,8 +196,8 @@ public class TestClassLoading {
expectedCoprocessorSimpleName.add(cpName2);
for (HRegion r : regions) {
Set<String> currentCoprocessors = r.getCoprocessorHost().getCoprocessors();
- assertTrue("Number of currently loaded coprocessors",
- currentCoprocessors.size() == 2);
+ assertEquals("Number of currently loaded coprocessors", 2,
+ currentCoprocessors.size());
assertEquals("Expected loaded coprocessors",
expectedCoprocessorSimpleName, currentCoprocessors);
}