You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/05/14 20:18:29 UTC
svn commit: r1594662 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/coprocessor/observers/ tes...
Author: liyin
Date: Wed May 14 18:18:28 2014
New Revision: 1594662
URL: http://svn.apache.org/r1594662
Log:
[HBASE-2000] load coprocessors from jar published in hdfs
Author: adela
Summary: on online configuration change - load jars from hdfs for coprocessors
Test Plan: added TestClassLoading, also ran TestHRegionObserverBypassCoprocessor and made sure it passes
Reviewers: daviddeng
Reviewed By: daviddeng
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D1308323
Task ID: 4171555
Added:
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/ClassLoaderTestHelper.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java?rev=1594662&r1=1594661&r2=1594662&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java Wed May 14 18:18:28 2014
@@ -1065,7 +1065,7 @@ private HRegionLocation locateMetaInRoot
LOG.debug("IOException locateRegionInMeta attempt " + tries
+ " of " + params.getNumRetries()
+ " failed; retrying after sleep of "
- + params.getPauseTime(tries) + " because: " + e.getMessage());
+ + params.getPauseTime(tries) + " because: " + e.getMessage(), e);
} else {
throw e;
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1594662&r1=1594661&r2=1594662&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Wed May 14 18:18:28 2014
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.coproces
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.DoNotRetr
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.util.coprocessor.CoprocessorClassLoader;
import org.apache.hadoop.hbase.util.coprocessor.SortedCopyOnWriteSet;
@@ -64,7 +67,7 @@ public abstract class CoprocessorHost<E
"hbase.coprocessor.wal.classes";
public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
public static final boolean DEFAULT_ABORT_ON_ERROR = true;
-
+ public static final String USER_REGION_COPROCESSOR_FROM_HDFS_KEY = "hbase.coprocessor.jars.and.classes";
private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
protected ThriftClientInterface tcInter;
@@ -125,9 +128,11 @@ public abstract class CoprocessorHost<E
List<E> configured = new ArrayList<E>();
for (String className : defaultCPClasses) {
className = className.trim();
- if (findCoprocessor(className) != null) {
- continue;
- }
+ // TODO: check if we want to reenable this behavior later!
+ // if (findCoprocessor(className) != null) {
+ // System.out.println("coprocessor found with name: " + className);
+ // continue;
+ // }
ClassLoader cl = this.getClass().getClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
@@ -143,17 +148,91 @@ public abstract class CoprocessorHost<E
}
/**
- * Generally used when we do online configuration change for the loaded coprocessors
+ * Generally used when we do online configuration change for the loaded
+ * coprocessors
+ *
* @param conf
* @param confKey
*/
- protected void reloadSysCoprocessorsOnConfigChange(Configuration conf, String confKey) {
- //remove whatever is loaded already
- coprocessors.clear();
+ protected void reloadSysCoprocessorsOnConfigChange(Configuration conf,
+ String confKey) {
+ // remove whatever is loaded already
loadSystemCoprocessors(conf, confKey);
}
/**
+ * read coprocessors that should be loaded from configuration
+ *
+ * @param conf
+ */
+ private List<Pair<String, String>> readCoprocessorsFromConf(Configuration conf) {
+ List<Pair<String, String>> jarClass = new ArrayList<>();
+ Collection<String> jarsAndImpls = conf
+ .getStringCollection(USER_REGION_COPROCESSOR_FROM_HDFS_KEY);
+ String jar = null;
+ for (Iterator<String> iterator = jarsAndImpls.iterator(); iterator
+ .hasNext();) {
+ String string = (String) iterator.next();
+ if (string.endsWith(".jar")) {
+ if (checkIfCorrectPath(string)) {
+ jar = string;
+ } else {
+ LOG.error("jar " + string
+ + " is not placed on the correct path or path is incorrect!");
+ return null;
+ }
+ } else {
+ jarClass.add(new Pair<>(jar, string));
+ }
+ }
+ return jarClass;
+ }
+
+ /**
+ * Checks if the specified path for the coprocessor jar is in expected format
+ * TODO: make this more advanced - currently is very hardoced and dummy
+ *
+ * @param path
+ * - Absolute and complete path where the coprocessor jar resides
+ * Expected path is everything in this format:
+ * coprocessors/project/version(integer)/jarfile
+ * @return
+ */
+ public static boolean checkIfCorrectPath(String path) {
+ String[] firstSplit = path.split(":");
+ String[] parts = firstSplit[2].split("/");
+ // port is included here
+ if (parts.length != 5) {
+ return false;
+ }
+ if (!parts[1].equals("coprocessors")) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Used to load coprocessors whose jar is specified via hdfs path. All
+ * existing coprocessors will be unloaded (if they appear again in the new
+ * configuration then they will be re-added).
+ *
+ * @param config
+ * @throws IOException
+ */
+ protected void reloadCoprocessorsFromHdfs(Configuration config)
+ throws IOException {
+ List<Pair<String, String>> fromConf = this.readCoprocessorsFromConf(config);
+ List<E> newCoprocessors = new ArrayList<>();
+ for (Pair<String, String> pair : fromConf) {
+ System.out.println(pair.getFirst() + " " + pair.getSecond());
+ E coproc = load(new Path(pair.getFirst()), pair.getSecond(),
+ Coprocessor.PRIORITY_USER, config);
+ newCoprocessors.add(coproc);
+ }
+ this.coprocessors.addAll(newCoprocessors);
+ }
+
+ /**
* Load a coprocessor implementation into the host
* @param path path to implementation jar
* @param className the main class name
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java?rev=1594662&r1=1594661&r2=1594662&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java Wed May 14 18:18:28 2014
@@ -73,14 +73,16 @@ public class RegionCoprocessorHost exten
/**
* Used mainly when we dynamically reload the configuration
+ * @throws IOException
*/
- public void reloadCoprocessors(Configuration newConf) {
+ public void reloadCoprocessors(Configuration newConf) throws IOException {
+ coprocessors.clear();
// reload system default cp's from configuration.
reloadSysCoprocessorsOnConfigChange(newConf, REGION_COPROCESSOR_CONF_KEY);
// reload system default cp's for user tables from configuration.
- //TODO: check whether this checks for ROOT too
if (!region.getRegionInfo().getTableDesc().isMetaRegion()
&& !region.getRegionInfo().getTableDesc().isRootRegion()) {
+ reloadCoprocessorsFromHdfs(newConf);
reloadSysCoprocessorsOnConfigChange(newConf, USER_REGION_COPROCESSOR_CONF_KEY);
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1594662&r1=1594661&r2=1594662&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed May 14 18:18:28 2014
@@ -550,13 +550,21 @@ public class HRegion implements HeapSize
this.coprocessorHost = new RegionCoprocessorHost(this,
this.conf);
// initialize dynamic parameters with current configuration
- this.loadDynamicConf(conf);
+ try {
+ this.loadDynamicConf(conf);
+ } catch (IOException e) {
+ LOG.error("Was unable to load coprocessors from configuration", e);
+ }
}
@Override
public void notifyOnChange(Configuration conf) {
LOG.info("Online configuration changed!");
- this.loadDynamicConf(conf);
+ try {
+ this.loadDynamicConf(conf);
+ } catch (IOException e) {
+ LOG.error("Was unable to load coprocessors from configuration", e);
+ }
}
private static void logIfChange(String varName, long orgV, long newV) {
@@ -566,8 +574,9 @@ public class HRegion implements HeapSize
}
/**
* Load online configurable parameters from a specified Configuration
+ * @throws IOException
*/
- private void loadDynamicConf(Configuration conf) {
+ private void loadDynamicConf(Configuration conf) throws IOException {
long newColumnfamilyMemstoreFlushSize = conf.getLong(
HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE,
HTableDescriptor.DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE);
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java?rev=1594662&r1=1594661&r2=1594662&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java Wed May 14 18:18:28 2014
@@ -8,8 +8,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import junit.framework.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -28,6 +26,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/ClassLoaderTestHelper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/ClassLoaderTestHelper.java?rev=1594662&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/ClassLoaderTestHelper.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/ClassLoaderTestHelper.java Wed May 14 18:18:28 2014
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.coprocessor;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileObject;
+import javax.tools.StandardJavaFileManager;
+import javax.tools.ToolProvider;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Some utilities to help class loader testing
+ */
+public class ClassLoaderTestHelper {
+ private static final Log LOG = LogFactory.getLog(ClassLoaderTestHelper.class);
+
+ private static final int BUFFER_SIZE = 4096;
+
+ /**
+ * Jar a list of files into a jar archive.
+ *
+ * @param archiveFile the target jar archive
+ * @param tobejared a list of files to be jared
+ */
+ private static boolean createJarArchive(File archiveFile, File[] tobeJared) {
+ try {
+ byte buffer[] = new byte[BUFFER_SIZE];
+ // Open archive file
+ FileOutputStream stream = new FileOutputStream(archiveFile);
+ JarOutputStream out = new JarOutputStream(stream, new Manifest());
+
+ for (int i = 0; i < tobeJared.length; i++) {
+ if (tobeJared[i] == null || !tobeJared[i].exists()
+ || tobeJared[i].isDirectory()) {
+ continue;
+ }
+
+ // Add archive entry
+ JarEntry jarAdd = new JarEntry(tobeJared[i].getName());
+ jarAdd.setTime(tobeJared[i].lastModified());
+ out.putNextEntry(jarAdd);
+
+ // Write file to archive
+ FileInputStream in = new FileInputStream(tobeJared[i]);
+ while (true) {
+ int nRead = in.read(buffer, 0, buffer.length);
+ if (nRead <= 0)
+ break;
+ out.write(buffer, 0, nRead);
+ }
+ in.close();
+ }
+ out.close();
+ stream.close();
+ LOG.info("Adding classes to jar file completed");
+ return true;
+ } catch (Exception ex) {
+ LOG.error("Error: " + ex.getMessage());
+ return false;
+ }
+ }
+
+ /**
+ * Create a test jar for testing purpose for a given class
+ * name with specified code string: save the class to a file,
+ * compile it, and jar it up. If the code string passed in is
+ * null, a bare empty class will be created and used.
+ *
+ * @param testDir the folder under which to store the test class and jar
+ * @param className the test class name
+ * @param code the optional test class code, which can be null.
+ * If null, a bare empty class will be used
+ * @return the test jar file generated
+ */
+ public static File buildJar(String testDir,
+ String className, String code) throws Exception {
+ return buildJar(testDir, className, code, testDir);
+ }
+
+ /**
+ * Create a test jar for testing purpose for a given class
+ * name with specified code string.
+ *
+ * @param testDir the folder under which to store the test class
+ * @param className the test class name
+ * @param code the optional test class code, which can be null.
+ * If null, an empty class will be used
+ * @param folder the folder under which to store the generated jar
+ * @return the test jar file generated
+ */
+ public static File buildJar(String testDir,
+ String className, String code, String folder) throws Exception {
+ String javaCode = code != null ? code : "public class " + className + " {}";
+ Path srcDir = new Path(testDir, "src");
+ File srcDirPath = new File(srcDir.toString());
+ srcDirPath.mkdirs();
+ File sourceCodeFile = new File(srcDir.toString(), className + ".java");
+ BufferedWriter bw = new BufferedWriter(new FileWriter(sourceCodeFile));
+ bw.write(javaCode);
+ bw.close();
+
+ // compile it by JavaCompiler
+ JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+ ArrayList<String> srcFileNames = new ArrayList<String>();
+ srcFileNames.add(sourceCodeFile.toString());
+ StandardJavaFileManager fm = compiler.getStandardFileManager(null, null,
+ null);
+ Iterable<? extends JavaFileObject> cu =
+ fm.getJavaFileObjects(sourceCodeFile);
+ List<String> options = new ArrayList<String>();
+ options.add("-classpath");
+ // only add hbase classes to classpath. This is a little bit tricky: assume
+ // the classpath is {hbaseSrc}/target/classes.
+ String currentDir = new File(".").getAbsolutePath();
+ String classpath = currentDir + File.separator + "target"+ File.separator
+ + "classes" + System.getProperty("path.separator")
+ + System.getProperty("java.class.path") + System.getProperty("path.separator")
+ + System.getProperty("surefire.test.class.path");
+
+ options.add(classpath);
+ LOG.debug("Setting classpath to: " + classpath);
+
+ JavaCompiler.CompilationTask task = compiler.getTask(null, fm, null,
+ options, null, cu);
+ assertTrue("Compile file " + sourceCodeFile + " failed.", task.call());
+
+ // build a jar file by the classes files
+ String jarFileName = className + ".jar";
+ File jarFile = new File(folder, jarFileName);
+ jarFile.getParentFile().mkdirs();
+ if (!createJarArchive(jarFile,
+ new File[]{new File(srcDir.toString(), className + ".class")})){
+ assertTrue("Build jar file failed.", false);
+ }
+ return jarFile;
+ }
+
+ /**
+ * Add a list of jar files to another jar file under a specific folder.
+ * It is used to generated coprocessor jar files which can be loaded by
+ * the coprocessor class loader. It is for testing usage only so we
+ * don't be so careful about stream closing in case any exception.
+ *
+ * @param targetJar the target jar file
+ * @param libPrefix the folder where to put inner jar files
+ * @param srcJars the source inner jar files to be added
+ * @throws Exception if anything doesn't work as expected
+ */
+ public static void addJarFilesToJar(File targetJar,
+ String libPrefix, File... srcJars) throws Exception {
+ FileOutputStream stream = new FileOutputStream(targetJar);
+ JarOutputStream out = new JarOutputStream(stream, new Manifest());
+ byte buffer[] = new byte[BUFFER_SIZE];
+
+ for (File jarFile: srcJars) {
+ // Add archive entry
+ JarEntry jarAdd = new JarEntry(libPrefix + jarFile.getName());
+ jarAdd.setTime(jarFile.lastModified());
+ out.putNextEntry(jarAdd);
+
+ // Write file to archive
+ FileInputStream in = new FileInputStream(jarFile);
+ while (true) {
+ int nRead = in.read(buffer, 0, buffer.length);
+ if (nRead <= 0)
+ break;
+ out.write(buffer, 0, nRead);
+ }
+ in.close();
+ }
+ out.close();
+ stream.close();
+ LOG.info("Adding jar file to outer jar file completed");
+ }
+
+ static String localDirPath(Configuration conf) {
+ return conf.get(CoprocessorClassLoader.LOCAL_DIR_KEY)
+ + File.separator + "jars" + File.separator;
+ }
+
+}
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java?rev=1594662&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java Wed May 14 18:18:28 2014
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.observers.TestHRegionObserverBypassCoprocessor.TestCoprocessor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+* Test coprocessors class loading.
+*/
+public class TestClassLoading {
+ private static final Log LOG = LogFactory.getLog(TestClassLoading.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final byte[] DUMMY = Bytes.toBytes("dummy");
+ private static final byte[] TEST = Bytes.toBytes("test");
+
+ private static MiniDFSCluster cluster;
+
+ static final String tableName = "TestClassLoading";
+ static final String cpName1 = "TestCP1";
+ static final String cpName2 = "TestCP2";
+
+ private static Class<?> testCoprocessor = TestCoprocessor.class;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ // load TestCoprocessor in the beginning
+ conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+ testCoprocessor.getName());
+ TEST_UTIL.startMiniCluster(1);
+ cluster = TEST_UTIL.getDFSCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+ if (admin.tableExists(tableName)) {
+ if (admin.isTableEnabled(tableName)) {
+ admin.disableTable(tableName);
+ }
+ admin.deleteTable(tableName);
+ }
+ TEST_UTIL.createTable(Bytes.toBytes(tableName),
+ new byte[][] { DUMMY, TEST });
+ }
+
+ static File buildCoprocessorJar(String className) throws Exception {
+ String code = "import org.apache.hadoop.hbase.coprocessor.observers.BaseRegionObserver;\n"
+ + "public class " + className + " extends BaseRegionObserver {}";
+ return ClassLoaderTestHelper.buildJar(TEST_UTIL.getDFSCluster()
+ .getDataDirectory().toString(), className, code);
+ }
+
+ public static String buildCorrectPathForCoprocessorJar(String dataDir) {
+ System.out.println("dataDir: " + dataDir);
+ String newStr = dataDir + File.separator+"coprocessors" + File.separator+"test" + File.separator + "1";
+ System.out.println(newStr);
+ return newStr;
+ }
+
+ @Test
+ public void testClassLoadingFromHDFS() throws Exception {
+ FileSystem fs = cluster.getFileSystem();
+ File jarFile1 = buildCoprocessorJar(cpName1);
+ System.out.println(jarFile1.getName());
+ File jarFile2 = buildCoprocessorJar(cpName2);
+ // have to create directories because we are not placing the files on hdfs
+ // root
+ assertTrue(fs.mkdirs(new Path("/coprocessors/test/1")));
+ // copy the jars into dfs
+ fs.moveFromLocalFile(new Path(jarFile1.getPath()), new Path(fs.getUri()
+ .toString(), buildCorrectPathForCoprocessorJar("") + Path.SEPARATOR));
+
+ String jarFileOnHDFS1 = buildCorrectPathForCoprocessorJar(fs.getUri()
+ .toString()) + Path.SEPARATOR + jarFile1.getName();
+ Path pathOnHDFS1 = new Path(jarFileOnHDFS1);
+ System.out.println(jarFileOnHDFS1);
+ assertTrue("Copy jar file to HDFS failed.", fs.exists(pathOnHDFS1));
+ LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1);
+
+ fs.moveFromLocalFile(new Path(jarFile2.getPath()), new Path(fs.getUri()
+ .toString(), buildCorrectPathForCoprocessorJar("") + Path.SEPARATOR));
+
+ String jarFileOnHDFS2 = buildCorrectPathForCoprocessorJar(fs.getUri()
+ .toString()) + Path.SEPARATOR + jarFile2.getName();
+ Path pathOnHDFS2 = new Path(jarFileOnHDFS2);
+ assertTrue("Copy jar file to HDFS failed.", fs.exists(pathOnHDFS2));
+ LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS2);
+ Configuration conf = TEST_UTIL.getConfiguration();
+
+ // check if only TestCoprocessor is currently loaded
+ List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegions(Bytes.toBytes(tableName));
+ Set<String> expectedCoprocessorSimpleName = new HashSet<>();
+ Set<String> allCoprocessors = RegionCoprocessorHost
+ .getEverLoadedCoprocessors();
+ assertEquals("Number of coprocessors ever loaded", 1,
+ allCoprocessors.size());
+ assertEquals("Expected loaded coprocessor",
+ TestCoprocessor.class.getName(), allCoprocessors.toArray()[0]);
+ // do online config change and confirm the new coprocessor is loaded
+ CoprocessorClassLoader.clearCache();
+ // remove the firstly added coprocessor
+ conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, "");
+ conf.set(CoprocessorHost.USER_REGION_COPROCESSOR_FROM_HDFS_KEY,
+ pathOnHDFS1 + "," + cpName1);
+
+ // invoke online configuration change
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ // check everloaded coprocessors
+ allCoprocessors = RegionCoprocessorHost.getEverLoadedCoprocessors();
+ assertEquals("Number of coprocessors ever loaded", 2,
+ allCoprocessors.size());
+ expectedCoprocessorSimpleName.add(cpName1);
+
+ for (HRegion r : regions) {
+ Set<String> currentCoprocessors = r.getCoprocessorHost()
+ .getCoprocessors();
+ assertEquals("Number of current coprocessors", 1,
+ currentCoprocessors.size());
+ assertEquals("Expected loaded coprocessors",
+ expectedCoprocessorSimpleName, currentCoprocessors);
+ }
+ //now load the second coprocessor too
+ String current = conf.get(CoprocessorHost.USER_REGION_COPROCESSOR_FROM_HDFS_KEY);
+ current +="," + pathOnHDFS2+"," + cpName2;
+ conf.set(CoprocessorHost.USER_REGION_COPROCESSOR_FROM_HDFS_KEY, current);
+ // invoke online config change
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ allCoprocessors = RegionCoprocessorHost.getEverLoadedCoprocessors();
+ assertEquals("Number of ever loaded coprocessors", 3,
+ allCoprocessors.size());
+ expectedCoprocessorSimpleName.add(cpName2);
+ for (HRegion r : regions) {
+ Set<String> currentCoprocessors = r.getCoprocessorHost().getCoprocessors();
+ assertTrue("Number of currently loaded coprocessors",
+ currentCoprocessors.size() == 2);
+ assertEquals("Expected loaded coprocessors",
+ expectedCoprocessorSimpleName, currentCoprocessors);
+ }
+ }
+
+}