You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/20 08:44:02 UTC
[1/3] kylin git commit: KYLIN-2109 minor,
reformat DeployCoprocessorCLI to unix line return
Repository: kylin
Updated Branches:
refs/heads/master eef157c7d -> 9cc953273
KYLIN-2109 minor, reformat DeployCoprocessorCLI to unix line return
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9cc95327
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9cc95327
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9cc95327
Branch: refs/heads/master
Commit: 9cc953273aa5f90281bd688d0a02092e144c4141
Parents: c7a48dd
Author: Li Yang <li...@apache.org>
Authored: Thu Oct 20 16:43:37 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Oct 20 16:43:51 2016 +0800
----------------------------------------------------------------------
.../hbase/util/DeployCoprocessorCLI.java | 763 ++++++++++---------
1 file changed, 383 insertions(+), 380 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/9cc95327/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index f2618dc..8f69c18 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -1,197 +1,197 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.util;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.regex.Matcher;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.KylinVersion;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class DeployCoprocessorCLI {
-
- public static final String CubeObserverClass = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
- public static final String CubeEndpointClass = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
- public static final String CubeObserverClassOld = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
- public static final String IIEndpointClassOld = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
- public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
- private static KylinConfig kylinConfig;
-
- private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
-
- public static void main(String[] args) throws IOException {
-
- if (args == null || args.length <= 1) {
- printUsageAndExit();
- }
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
- HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
-
- String localCoprocessorJar;
- if ("default".equals(args[0])) {
- localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
- } else {
- localCoprocessorJar = new File(args[0]).getAbsolutePath();
- }
-
- logger.info("Identify coprocessor jar " + localCoprocessorJar);
-
- List<String> tableNames = getHTableNames(kylinConfig);
- logger.info("Identify tables " + tableNames);
-
- String filterType = args[1].toLowerCase();
- if (filterType.equals("-table")) {
- tableNames = filterByTables(tableNames, Arrays.asList(args).subList(2, args.length));
- } else if (filterType.equals("-cube")) {
- tableNames = filterByCubes(tableNames, Arrays.asList(args).subList(2, args.length));
- } else if (!filterType.equals("all")) {
- printUsageAndExit();
- }
-
- logger.info("Will execute tables " + tableNames);
-
- Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
- logger.info("Old coprocessor jar: " + oldJarPaths);
-
- Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
- logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
-
- resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
-
- // Don't remove old jars, missing coprocessor jar will fail hbase
- // removeOldJars(oldJarPaths, fileSystem);
-
- hbaseAdmin.close();
-
+import java.util.regex.Matcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class DeployCoprocessorCLI {
+
+ public static final String CubeObserverClass = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
+ public static final String CubeEndpointClass = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
+ public static final String CubeObserverClassOld = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
+ public static final String IIEndpointClassOld = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
+ public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
+
+ private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
+
+ public static void main(String[] args) throws IOException {
+
+ if (args == null || args.length <= 1) {
+ printUsageAndExit();
+ }
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+ HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
+
+ String localCoprocessorJar;
+ if ("default".equals(args[0])) {
+ localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+ } else {
+ localCoprocessorJar = new File(args[0]).getAbsolutePath();
+ }
+
+ logger.info("Identify coprocessor jar " + localCoprocessorJar);
+
+ List<String> tableNames = getHTableNames(kylinConfig);
+ logger.info("Identify tables " + tableNames);
+
+ String filterType = args[1].toLowerCase();
+ if (filterType.equals("-table")) {
+ tableNames = filterByTables(tableNames, Arrays.asList(args).subList(2, args.length));
+ } else if (filterType.equals("-cube")) {
+ tableNames = filterByCubes(tableNames, Arrays.asList(args).subList(2, args.length));
+ } else if (!filterType.equals("all")) {
+ printUsageAndExit();
+ }
+
+ logger.info("Will execute tables " + tableNames);
+
+ Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
+ logger.info("Old coprocessor jar: " + oldJarPaths);
+
+ Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
+ logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
+
+ List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
+
+ // Don't remove old jars, missing coprocessor jar will fail hbase
+ // removeOldJars(oldJarPaths, fileSystem);
+
+ hbaseAdmin.close();
+
logger.info("Processed tables count: " + processedTables.size());
logger.info("Processed tables: " + processedTables);
- logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
- }
-
- private static void printUsageAndExit() {
- logger.info("Probe run, exiting. Append argument 'all' or specific tables/cubes to execute.");
- System.exit(0);
- }
-
- private static List<String> filterByCubes(List<String> allTableNames, List<String> cubeNames) {
- CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- List<String> result = Lists.newArrayList();
- for (String c : cubeNames) {
- c = c.trim();
- if (c.endsWith(","))
- c = c.substring(0, c.length() - 1);
-
- CubeInstance cubeInstance = cubeManager.getCube(c);
- for (CubeSegment segment : cubeInstance.getSegments()) {
- String tableName = segment.getStorageLocationIdentifier();
- if (allTableNames.contains(tableName)) {
- result.add(tableName);
- }
- }
- }
- return result;
- }
-
- private static List<String> filterByTables(List<String> allTableNames, List<String> tableNames) {
- List<String> result = Lists.newArrayList();
- for (String t : tableNames) {
- t = t.trim();
- if (t.endsWith(","))
- t = t.substring(0, t.length() - 1);
-
- if (allTableNames.contains(t)) {
- result.add(t);
- }
- }
- return result;
- }
-
- public static void deployCoprocessor(HTableDescriptor tableDesc) {
- try {
- initHTableCoprocessor(tableDesc);
- logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
-
- } catch (Exception ex) {
- logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
- logger.error("Will try creating the table without coprocessor.");
- }
- }
-
- private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
- Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
-
- DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- }
-
- public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Add coprocessor on " + desc.getNameAsString());
- desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
- desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null);
- }
-
+ logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
+ }
+
+ private static void printUsageAndExit() {
+ logger.info("Probe run, exiting. Append argument 'all' or specific tables/cubes to execute.");
+ System.exit(0);
+ }
+
+ private static List<String> filterByCubes(List<String> allTableNames, List<String> cubeNames) {
+ CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ List<String> result = Lists.newArrayList();
+ for (String c : cubeNames) {
+ c = c.trim();
+ if (c.endsWith(","))
+ c = c.substring(0, c.length() - 1);
+
+ CubeInstance cubeInstance = cubeManager.getCube(c);
+ for (CubeSegment segment : cubeInstance.getSegments()) {
+ String tableName = segment.getStorageLocationIdentifier();
+ if (allTableNames.contains(tableName)) {
+ result.add(tableName);
+ }
+ }
+ }
+ return result;
+ }
+
+ private static List<String> filterByTables(List<String> allTableNames, List<String> tableNames) {
+ List<String> result = Lists.newArrayList();
+ for (String t : tableNames) {
+ t = t.trim();
+ if (t.endsWith(","))
+ t = t.substring(0, t.length() - 1);
+
+ if (allTableNames.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ }
+
+ public static void deployCoprocessor(HTableDescriptor tableDesc) {
+ try {
+ initHTableCoprocessor(tableDesc);
+ logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
+
+ } catch (Exception ex) {
+ logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
+ logger.error("Will try creating the table without coprocessor.");
+ }
+ }
+
+ private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+
+ String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+ Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
+
+ DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ }
+
+ public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+ logger.info("Add coprocessor on " + desc.getNameAsString());
+ desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
+ desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null);
+ }
+
public static boolean resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
//when the table has migrated from dev env to test(prod) env, the dev server
@@ -204,49 +204,49 @@ public class DeployCoprocessorCLI {
logger.info("reset coprocessor on " + tableName);
- logger.info("Disable " + tableName);
- hbaseAdmin.disableTable(tableName);
-
- while (desc.hasCoprocessor(CubeObserverClass)) {
- desc.removeCoprocessor(CubeObserverClass);
- }
- while (desc.hasCoprocessor(CubeEndpointClass)) {
- desc.removeCoprocessor(CubeEndpointClass);
- }
- while (desc.hasCoprocessor(IIEndpointClass)) {
- desc.removeCoprocessor(IIEndpointClass);
- }
- // remove legacy coprocessor from v1.x
- while (desc.hasCoprocessor(CubeObserverClassOld)) {
- desc.removeCoprocessor(CubeObserverClassOld);
- }
- while (desc.hasCoprocessor(IIEndpointClassOld)) {
- desc.removeCoprocessor(IIEndpointClassOld);
- }
- addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
-
- // update commit tags
- String commitInfo = KylinVersion.getGitCommitInfo();
- if (!StringUtils.isEmpty(commitInfo)) {
- desc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
- }
-
- hbaseAdmin.modifyTable(tableName, desc);
-
- logger.info("Enable " + tableName);
- hbaseAdmin.enableTable(tableName);
+ logger.info("Disable " + tableName);
+ hbaseAdmin.disableTable(tableName);
+
+ while (desc.hasCoprocessor(CubeObserverClass)) {
+ desc.removeCoprocessor(CubeObserverClass);
+ }
+ while (desc.hasCoprocessor(CubeEndpointClass)) {
+ desc.removeCoprocessor(CubeEndpointClass);
+ }
+ while (desc.hasCoprocessor(IIEndpointClass)) {
+ desc.removeCoprocessor(IIEndpointClass);
+ }
+ // remove legacy coprocessor from v1.x
+ while (desc.hasCoprocessor(CubeObserverClassOld)) {
+ desc.removeCoprocessor(CubeObserverClassOld);
+ }
+ while (desc.hasCoprocessor(IIEndpointClassOld)) {
+ desc.removeCoprocessor(IIEndpointClassOld);
+ }
+ addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+
+ // update commit tags
+ String commitInfo = KylinVersion.getGitCommitInfo();
+ if (!StringUtils.isEmpty(commitInfo)) {
+ desc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
+ }
+
+ hbaseAdmin.modifyTable(tableName, desc);
+
+ logger.info("Enable " + tableName);
+ hbaseAdmin.enableTable(tableName);
return true;
- }
-
- private static List<String> processedTables = Collections.synchronizedList(new ArrayList<String>());
-
- private static void resetCoprocessorOnHTables(final HBaseAdmin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+ }
+
+
+ private static List<String> resetCoprocessorOnHTables(final HBaseAdmin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+ List<String> processedTables = Collections.synchronizedList(new ArrayList<String>());
ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
CountDownLatch countDownLatch = new CountDownLatch(tableNames.size());
for (final String tableName : tableNames) {
- coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName));
+ coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName, processedTables));
}
try {
@@ -256,6 +256,7 @@ public class DeployCoprocessorCLI {
}
coprocessorPool.shutdown();
+ return processedTables;
}
private static class ResetCoprocessorWorker implements Runnable {
@@ -263,175 +264,177 @@ public class DeployCoprocessorCLI {
private final HBaseAdmin hbaseAdmin;
private final Path hdfsCoprocessorJar;
private final String tableName;
+ private final List<String> processedTables;
- public ResetCoprocessorWorker(CountDownLatch countDownLatch, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, String tableName) {
+ public ResetCoprocessorWorker(CountDownLatch countDownLatch, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> processedTables) {
this.countDownLatch = countDownLatch;
this.hbaseAdmin = hbaseAdmin;
this.hdfsCoprocessorJar = hdfsCoprocessorJar;
this.tableName = tableName;
+ this.processedTables = processedTables;
}
@Override
public void run() {
- try {
+ try {
boolean isProcessed = resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
if (isProcessed) {
processedTables.add(tableName);
}
} catch (Exception ex) {
- logger.error("Error processing " + tableName, ex);
+ logger.error("Error processing " + tableName, ex);
} finally {
countDownLatch.countDown();
- }
-
- }
- }
-
- public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
- FileStatus newestJar = null;
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (fileStatus.getPath().toString().endsWith(".jar")) {
- if (newestJar == null) {
- newestJar = fileStatus;
- } else {
- if (newestJar.getModificationTime() < fileStatus.getModificationTime())
- newestJar = fileStatus;
- }
- }
- }
- if (newestJar == null)
- return null;
-
- Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
- logger.info("The newest coprocessor is " + path.toString());
- return path;
- }
-
- public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
- Path uploadPath = null;
- File localCoprocessorFile = new File(localCoprocessorJar);
-
- // check existing jars
- if (oldJarPaths == null) {
- oldJarPaths = new HashSet<String>();
- }
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (isSame(localCoprocessorFile, fileStatus)) {
- uploadPath = fileStatus.getPath();
- break;
- }
- String filename = fileStatus.getPath().toString();
- if (filename.endsWith(".jar")) {
- oldJarPaths.add(filename);
- }
- }
-
- // upload if not existing
- if (uploadPath == null) {
- // figure out a unique new jar file name
- Set<String> oldJarNames = new HashSet<String>();
- for (String path : oldJarPaths) {
- oldJarNames.add(new Path(path).getName());
- }
- String baseName = getBaseFileName(localCoprocessorJar);
- String newName = null;
- int i = 0;
- while (newName == null) {
- newName = baseName + "-" + (i++) + ".jar";
- if (oldJarNames.contains(newName))
- newName = null;
- }
-
- // upload
- uploadPath = new Path(coprocessorDir, newName);
- FileInputStream in = null;
- FSDataOutputStream out = null;
- try {
- in = new FileInputStream(localCoprocessorFile);
- out = fileSystem.create(uploadPath);
- IOUtils.copy(in, out);
- } finally {
- IOUtils.closeQuietly(in);
- IOUtils.closeQuietly(out);
- }
-
- fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
-
- }
-
- uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
- return uploadPath;
- }
-
- private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
- return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
- }
-
- private static String getBaseFileName(String localCoprocessorJar) {
- File localJar = new File(localCoprocessorJar);
- String baseName = localJar.getName();
- if (baseName.endsWith(".jar"))
- baseName = baseName.substring(0, baseName.length() - ".jar".length());
- return baseName;
- }
-
- private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
- String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
- Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
- fileSystem.mkdirs(coprocessorDir);
- return coprocessorDir;
- }
-
- private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
- HashSet<String> result = new HashSet<String>();
-
- for (String tableName : tableNames) {
- HTableDescriptor tableDescriptor = null;
- try {
- tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- } catch (TableNotFoundException e) {
- logger.warn("Table not found " + tableName, e);
- continue;
- }
-
- Matcher keyMatcher;
- Matcher valueMatcher;
- for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
- keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
- if (!keyMatcher.matches()) {
- continue;
- }
- valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
- if (!valueMatcher.matches()) {
- continue;
- }
-
- String jarPath = valueMatcher.group(1).trim();
- if (StringUtils.isNotEmpty(jarPath)) {
- result.add(jarPath);
- }
- }
- }
-
- return result;
- }
-
- private static List<String> getHTableNames(KylinConfig config) {
- CubeManager cubeMgr = CubeManager.getInstance(config);
-
- ArrayList<String> result = new ArrayList<String>();
- for (CubeInstance cube : cubeMgr.listAllCubes()) {
- for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
- String tableName = seg.getStorageLocationIdentifier();
- if (StringUtils.isBlank(tableName) == false) {
- result.add(tableName);
- System.out.println("added new table: " + tableName);
- }
- }
- }
-
- return result;
- }
-}
+ }
+
+ }
+ }
+
+ public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
+ FileStatus newestJar = null;
+ for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+ if (fileStatus.getPath().toString().endsWith(".jar")) {
+ if (newestJar == null) {
+ newestJar = fileStatus;
+ } else {
+ if (newestJar.getModificationTime() < fileStatus.getModificationTime())
+ newestJar = fileStatus;
+ }
+ }
+ }
+ if (newestJar == null)
+ return null;
+
+ Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
+ logger.info("The newest coprocessor is " + path.toString());
+ return path;
+ }
+
+ public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
+ Path uploadPath = null;
+ File localCoprocessorFile = new File(localCoprocessorJar);
+
+ // check existing jars
+ if (oldJarPaths == null) {
+ oldJarPaths = new HashSet<String>();
+ }
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+ for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+ if (isSame(localCoprocessorFile, fileStatus)) {
+ uploadPath = fileStatus.getPath();
+ break;
+ }
+ String filename = fileStatus.getPath().toString();
+ if (filename.endsWith(".jar")) {
+ oldJarPaths.add(filename);
+ }
+ }
+
+ // upload if not existing
+ if (uploadPath == null) {
+ // figure out a unique new jar file name
+ Set<String> oldJarNames = new HashSet<String>();
+ for (String path : oldJarPaths) {
+ oldJarNames.add(new Path(path).getName());
+ }
+ String baseName = getBaseFileName(localCoprocessorJar);
+ String newName = null;
+ int i = 0;
+ while (newName == null) {
+ newName = baseName + "-" + (i++) + ".jar";
+ if (oldJarNames.contains(newName))
+ newName = null;
+ }
+
+ // upload
+ uploadPath = new Path(coprocessorDir, newName);
+ FileInputStream in = null;
+ FSDataOutputStream out = null;
+ try {
+ in = new FileInputStream(localCoprocessorFile);
+ out = fileSystem.create(uploadPath);
+ IOUtils.copy(in, out);
+ } finally {
+ IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(out);
+ }
+
+ fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
+
+ }
+
+ uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
+ return uploadPath;
+ }
+
+ private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
+ return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
+ }
+
+ private static String getBaseFileName(String localCoprocessorJar) {
+ File localJar = new File(localCoprocessorJar);
+ String baseName = localJar.getName();
+ if (baseName.endsWith(".jar"))
+ baseName = baseName.substring(0, baseName.length() - ".jar".length());
+ return baseName;
+ }
+
+ private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+ String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
+ Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
+ fileSystem.mkdirs(coprocessorDir);
+ return coprocessorDir;
+ }
+
+ private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
+ HashSet<String> result = new HashSet<String>();
+
+ for (String tableName : tableNames) {
+ HTableDescriptor tableDescriptor = null;
+ try {
+ tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ } catch (TableNotFoundException e) {
+ logger.warn("Table not found " + tableName, e);
+ continue;
+ }
+
+ Matcher keyMatcher;
+ Matcher valueMatcher;
+ for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
+ keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
+ if (!keyMatcher.matches()) {
+ continue;
+ }
+ valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
+ if (!valueMatcher.matches()) {
+ continue;
+ }
+
+ String jarPath = valueMatcher.group(1).trim();
+ if (StringUtils.isNotEmpty(jarPath)) {
+ result.add(jarPath);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private static List<String> getHTableNames(KylinConfig config) {
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+
+ ArrayList<String> result = new ArrayList<String>();
+ for (CubeInstance cube : cubeMgr.listAllCubes()) {
+ for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+ String tableName = seg.getStorageLocationIdentifier();
+ if (StringUtils.isBlank(tableName) == false) {
+ result.add(tableName);
+ System.out.println("added new table: " + tableName);
+ }
+ }
+ }
+
+ return result;
+ }
+}
[2/3] kylin git commit: KYLIN-2109 Deploy coprocessor only this
server own the table
Posted by li...@apache.org.
KYLIN-2109 Deploy coprocessor only this server own the table
Signed-off-by: Li Yang <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c7a48dd9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c7a48dd9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c7a48dd9
Branch: refs/heads/master
Commit: c7a48dd93f7733b6ddf267e8b8f56ed7153a280c
Parents: b7e8065
Author: kangkaisen <ka...@live.com>
Authored: Wed Oct 19 15:45:19 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Oct 20 16:43:51 2016 +0800
----------------------------------------------------------------------
.../hbase/util/DeployCoprocessorCLI.java | 28 +++++++++++++++-----
1 file changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c7a48dd9/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index cc9b988..f2618dc 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -69,6 +69,8 @@ public class DeployCoprocessorCLI {
public static final String CubeObserverClassOld = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
public static final String IIEndpointClassOld = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
+ private static KylinConfig kylinConfig;
+
private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
public static void main(String[] args) throws IOException {
@@ -77,7 +79,7 @@ public class DeployCoprocessorCLI {
printUsageAndExit();
}
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ kylinConfig = KylinConfig.getInstanceFromEnv();
Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
@@ -189,12 +191,22 @@ public class DeployCoprocessorCLI {
desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null);
}
- public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+ public static boolean resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+ HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+
+ //when the table has migrated from dev env to test(prod) env, the dev server
+ //should not reset the coprocessor of the table.
+ String host = desc.getValue(IRealizationConstants.HTableTag);
+ if (!host.equalsIgnoreCase(kylinConfig.getMetadataUrlPrefix())) {
+ logger.warn("This server doesn't own this table: " + tableName);
+ return false;
+ }
+
+ logger.info("reset coprocessor on " + tableName);
+
logger.info("Disable " + tableName);
hbaseAdmin.disableTable(tableName);
- logger.info("Unset coprocessor on " + tableName);
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
while (desc.hasCoprocessor(CubeObserverClass)) {
desc.removeCoprocessor(CubeObserverClass);
}
@@ -223,6 +235,8 @@ public class DeployCoprocessorCLI {
logger.info("Enable " + tableName);
hbaseAdmin.enableTable(tableName);
+
+ return true;
}
private static List<String> processedTables = Collections.synchronizedList(new ArrayList<String>());
@@ -260,8 +274,10 @@ public class DeployCoprocessorCLI {
@Override
public void run() {
try {
- resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
- processedTables.add(tableName);
+ boolean isProcessed = resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
+ if (isProcessed) {
+ processedTables.add(tableName);
+ }
} catch (Exception ex) {
logger.error("Error processing " + tableName, ex);
} finally {
[3/3] kylin git commit: KYLIN-2089 Make update HBase coprocessor
concurrent
Posted by li...@apache.org.
KYLIN-2089 Make update HBase coprocessor concurrent
Signed-off-by: Li Yang <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b7e8065c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b7e8065c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b7e8065c
Branch: refs/heads/master
Commit: b7e8065c0b44eb45ec11fd3b498fd72652782b84
Parents: eef157c
Author: kangkaisen <ka...@live.com>
Authored: Wed Oct 12 20:12:15 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Oct 20 16:43:51 2016 +0800
----------------------------------------------------------------------
.../hbase/util/DeployCoprocessorCLI.java | 55 ++++++++++++++++----
1 file changed, 46 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b7e8065c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index a1193e7..cc9b988 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -23,10 +23,14 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import org.apache.commons.io.IOUtils;
@@ -107,14 +111,15 @@ public class DeployCoprocessorCLI {
Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
- List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
+ resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
// Don't remove old jars, missing coprocessor jar will fail hbase
// removeOldJars(oldJarPaths, fileSystem);
hbaseAdmin.close();
- logger.info("Processed " + processedTables);
+ logger.info("Processed tables count: " + processedTables.size());
+ logger.info("Processed tables: " + processedTables);
logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
}
@@ -220,18 +225,50 @@ public class DeployCoprocessorCLI {
hbaseAdmin.enableTable(tableName);
}
- private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
- List<String> processed = new ArrayList<String>();
-
- for (String tableName : tableNames) {
+ private static List<String> processedTables = Collections.synchronizedList(new ArrayList<String>());
+
+ private static void resetCoprocessorOnHTables(final HBaseAdmin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+ ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+ CountDownLatch countDownLatch = new CountDownLatch(tableNames.size());
+
+ for (final String tableName : tableNames) {
+ coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName));
+ }
+
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ logger.error("reset coprocessor failed: ", e);
+ }
+
+ coprocessorPool.shutdown();
+ }
+
+ private static class ResetCoprocessorWorker implements Runnable {
+ private final CountDownLatch countDownLatch;
+ private final HBaseAdmin hbaseAdmin;
+ private final Path hdfsCoprocessorJar;
+ private final String tableName;
+
+ public ResetCoprocessorWorker(CountDownLatch countDownLatch, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, String tableName) {
+ this.countDownLatch = countDownLatch;
+ this.hbaseAdmin = hbaseAdmin;
+ this.hdfsCoprocessorJar = hdfsCoprocessorJar;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public void run() {
try {
resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
- processed.add(tableName);
- } catch (IOException ex) {
+ processedTables.add(tableName);
+ } catch (Exception ex) {
logger.error("Error processing " + tableName, ex);
+ } finally {
+ countDownLatch.countDown();
}
+
}
- return processed;
}
public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {