You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/02/11 13:49:45 UTC
[06/51] [partial] kylin git commit: KYLIN-1416 keep only website in
document branch
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
deleted file mode 100644
index 44bc2c3..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hdfs.web.JsonUtil;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.*;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by honma on 9/3/14.
- * <p/>
- * This tool serves for the purpose of migrating cubes. e.g. upgrade cube from
- * dev env to test(prod) env, or vice versa.
- * <p/>
- * Note that different envs are assumed to share the same hadoop cluster,
- * including hdfs, hbase and hive.
- */
-public class CubeMigrationCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCLI.class);
-
- private static List<Opt> operations;
- private static KylinConfig srcConfig;
- private static KylinConfig dstConfig;
- private static ResourceStore srcStore;
- private static ResourceStore dstStore;
- private static FileSystem hdfsFS;
- private static HBaseAdmin hbaseAdmin;
-
- public static final String ACL_INFO_FAMILY = "i";
- private static final String ACL_TABLE_NAME = "_acl";
- private static final String ACL_INFO_FAMILY_TYPE_COLUMN = "t";
- private static final String ACL_INFO_FAMILY_OWNER_COLUMN = "o";
- private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p";
-
- public static void main(String[] args) throws IOException, InterruptedException {
-
- if (args.length != 8) {
- usage();
- System.exit(1);
- }
-
- moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]);
- }
-
- private static void usage() {
- System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute");
- System.out.println(" srcKylinConfigUri: The KylinConfig of the cube’s source \n" + "dstKylinConfigUri: The KylinConfig of the cube’s new home \n" + "cubeName: the name of cube to be migrated. \n" + "projectName: The target project in the target environment.(Make sure it exist) \n" + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" + "realExecute: if false, just print the operations to take, if true, do the real migration. \n");
-
- }
-
- public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
-
- srcConfig = srcCfg;
- srcStore = ResourceStore.getStore(srcConfig);
- dstConfig = dstCfg;
- dstStore = ResourceStore.getStore(dstConfig);
-
- CubeManager cubeManager = CubeManager.getInstance(srcConfig);
- CubeInstance cube = cubeManager.getCube(cubeName);
- logger.info("cube to be moved is : " + cubeName);
-
- if (cube.getStatus() != RealizationStatusEnum.READY)
- throw new IllegalStateException("Cannot migrate cube that is not in READY state.");
-
- for (CubeSegment segment : cube.getSegments()) {
- if (segment.getStatus() != SegmentStatusEnum.READY) {
- throw new IllegalStateException("At least one segment is not in READY state");
- }
- }
-
- checkAndGetHbaseUrl();
-
- Configuration conf = HBaseConfiguration.create();
- hbaseAdmin = new HBaseAdmin(conf);
-
- hdfsFS = FileSystem.get(new Configuration());
-
- operations = new ArrayList<Opt>();
-
- copyFilesInMetaStore(cube, overwriteIfExists);
- renameFoldersInHdfs(cube);
- changeHtableHost(cube);
- addCubeIntoProject(cubeName, projectName);
- if (Boolean.parseBoolean(copyAcl) == true) {
- copyACL(cube, projectName);
- }
-
- if (Boolean.parseBoolean(purgeAndDisable) == true) {
- purgeAndDisable(cubeName); // this should be the last action
- }
-
- if (realExecute.equalsIgnoreCase("true")) {
- doOpts();
- } else {
- showOpts();
- }
- }
-
- public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
-
- moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute);
- }
-
- private static String checkAndGetHbaseUrl() {
- String srcMetadataUrl = srcConfig.getMetadataUrl();
- String dstMetadataUrl = dstConfig.getMetadataUrl();
-
- logger.info("src metadata url is " + srcMetadataUrl);
- logger.info("dst metadata url is " + dstMetadataUrl);
-
- int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase");
- int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase");
- if (srcIndex < 0 || dstIndex < 0)
- throw new IllegalStateException("Both metadata urls should be hbase metadata url");
-
- String srcHbaseUrl = srcMetadataUrl.substring(srcIndex).trim();
- String dstHbaseUrl = dstMetadataUrl.substring(dstIndex).trim();
- if (!srcHbaseUrl.equalsIgnoreCase(dstHbaseUrl)) {
- throw new IllegalStateException("hbase url not equal! ");
- }
-
- logger.info("hbase url is " + srcHbaseUrl.trim());
- return srcHbaseUrl.trim();
- }
-
- private static void renameFoldersInHdfs(CubeInstance cube) {
- for (CubeSegment segment : cube.getSegments()) {
-
- String jobUuid = segment.getLastBuildJobID();
- String src = JobInstance.getJobWorkingDir(jobUuid, srcConfig.getHdfsWorkingDirectory());
- String tgt = JobInstance.getJobWorkingDir(jobUuid, dstConfig.getHdfsWorkingDirectory());
-
- operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] { src, tgt }));
- }
-
- }
-
- private static void changeHtableHost(CubeInstance cube) {
- for (CubeSegment segment : cube.getSegments()) {
- operations.add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() }));
- }
- }
-
- private static void copyACL(CubeInstance cube, String projectName) {
- operations.add(new Opt(OptType.COPY_ACL, new Object[] { cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName }));
- }
-
- private static void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException {
-
- List<String> metaItems = new ArrayList<String>();
- List<String> dictAndSnapshot = new ArrayList<String>();
- listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
-
- if (dstStore.exists(cube.getResourcePath()) && !overwriteIfExists.equalsIgnoreCase("true"))
- throw new IllegalStateException("The cube named " + cube.getName() + " already exists on target metadata store. Use overwriteIfExists to overwrite it");
-
- for (String item : metaItems) {
- operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item }));
- }
-
- for (String item : dictAndSnapshot) {
- operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
- }
- }
-
- private static void addCubeIntoProject(String cubeName, String projectName) throws IOException {
- String projectResPath = ProjectInstance.concatResourcePath(projectName);
- if (!dstStore.exists(projectResPath))
- throw new IllegalStateException("The target project " + projectName + "does not exist");
-
- operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { cubeName, projectName }));
- }
-
-
- private static void purgeAndDisable(String cubeName) throws IOException {
- operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName }));
- }
-
- private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, List<String> dictAndSnapshot) throws IOException {
-
- CubeDesc cubeDesc = cube.getDescriptor();
- metaResource.add(cube.getResourcePath());
- metaResource.add(cubeDesc.getResourcePath());
- metaResource.add(DataModelDesc.concatResourcePath(cubeDesc.getModelName()));
-
- for (String table : cubeDesc.getModel().getAllTables()) {
- metaResource.add(TableDesc.concatResourcePath(table.toUpperCase()));
- }
-
- for (CubeSegment segment : cube.getSegments()) {
- dictAndSnapshot.addAll(segment.getSnapshotPaths());
- dictAndSnapshot.addAll(segment.getDictionaryPaths());
- }
- }
-
- private static enum OptType {
- COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, COPY_ACL, PURGE_AND_DISABLE
- }
-
- private static class Opt {
- private OptType type;
- private Object[] params;
-
- private Opt(OptType type, Object[] params) {
- this.type = type;
- this.params = params;
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(type).append(":");
- for (Object s : params)
- sb.append(s).append(", ");
- return sb.toString();
- }
-
- }
-
- private static void showOpts() {
- for (int i = 0; i < operations.size(); ++i) {
- showOpt(operations.get(i));
- }
- }
-
- private static void showOpt(Opt opt) {
- logger.info("Operation: " + opt.toString());
- }
-
- private static void doOpts() throws IOException, InterruptedException {
- int index = 0;
- try {
- for (; index < operations.size(); ++index) {
- logger.info("Operation index :" + index);
- doOpt(operations.get(index));
- }
- } catch (Exception e) {
- logger.error("error met", e);
- logger.info("Try undoing previous changes");
- // undo:
- for (int i = index; i >= 0; --i) {
- try {
- undo(operations.get(i));
- } catch (Exception ee) {
- logger.error("error met ", e);
- logger.info("Continue undoing...");
- }
- }
-
- throw new RuntimeException("Cube moving failed");
- }
- }
-
- private static void doOpt(Opt opt) throws IOException, InterruptedException {
- logger.info("Executing operation: " + opt.toString());
-
- switch (opt.type) {
- case CHANGE_HTABLE_HOST: {
- String tableName = (String) opt.params[0];
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
- desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
- logger.info("CHANGE_HTABLE_HOST is completed");
- break;
- }
- case COPY_FILE_IN_META: {
- String item = (String) opt.params[0];
- RawResource res = srcStore.getResource(item);
- dstStore.putResource(item, res.inputStream, res.timestamp);
- res.inputStream.close();
- logger.info("Item " + item + " is copied");
- break;
- }
- case COPY_DICT_OR_SNAPSHOT: {
- String item = (String) opt.params[0];
-
- if (item.toLowerCase().endsWith(".dict")) {
- DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig);
- DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig);
- DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item);
-
- long ts = dictSrc.getLastModified();
- dictSrc.setLastModified(0);//to avoid resource store write conflict
- DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictSrc.getDictionaryObject(), dictSrc);
- dictSrc.setLastModified(ts);
-
- if (dictSaved == dictSrc) {
- //no dup found, already saved to dest
- logger.info("Item " + item + " is copied");
- } else {
- //dictSrc is rejected because of duplication
- //modify cube's dictionary path
- String cubeName = (String) opt.params[1];
- String cubeResPath = CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer);
- for (CubeSegment segment : cube.getSegments()) {
- for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) {
- if (entry.getValue().equalsIgnoreCase(item)) {
- entry.setValue(dictSaved.getResourcePath());
- }
- }
- }
- dstStore.putResource(cubeResPath, cube, cubeSerializer);
- logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused");
- }
-
- } else if (item.toLowerCase().endsWith(".snapshot")) {
- SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig);
- SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig);
- SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item);
-
- long ts = snapSrc.getLastModified();
- snapSrc.setLastModified(0);
- SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc);
- snapSrc.setLastModified(ts);
-
- if (snapSaved == snapSrc) {
- //no dup found, already saved to dest
- logger.info("Item " + item + " is copied");
-
- } else {
- String cubeName = (String) opt.params[1];
- String cubeResPath = CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer);
- for (CubeSegment segment : cube.getSegments()) {
- for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) {
- if (entry.getValue().equalsIgnoreCase(item)) {
- entry.setValue(snapSaved.getResourcePath());
- }
- }
- }
- dstStore.putResource(cubeResPath, cube, cubeSerializer);
- logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused");
-
- }
-
- } else {
- logger.error("unknown item found: " + item);
- logger.info("ignore it");
- }
-
- break;
- }
- case RENAME_FOLDER_IN_HDFS: {
- String srcPath = (String) opt.params[0];
- String dstPath = (String) opt.params[1];
- hdfsFS.rename(new Path(srcPath), new Path(dstPath));
- logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
- break;
- }
- case ADD_INTO_PROJECT: {
- String cubeName = (String) opt.params[0];
- String projectName = (String) opt.params[1];
- String projectResPath = ProjectInstance.concatResourcePath(projectName);
- Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
- ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer);
- project.removeRealization(RealizationType.CUBE, cubeName);
- project.addRealizationEntry(RealizationType.CUBE, cubeName);
- dstStore.putResource(projectResPath, project, projectSerializer);
- logger.info("Project instance for " + projectName + " is corrected");
- break;
- }
- case COPY_ACL: {
- String cubeId = (String) opt.params[0];
- String modelId = (String) opt.params[1];
- String projectName = (String) opt.params[2];
- String projectResPath = ProjectInstance.concatResourcePath(projectName);
- Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
- ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer);
- String projUUID = project.getUuid();
- HTableInterface srcAclHtable = null;
- HTableInterface destAclHtable = null;
- try {
- srcAclHtable = HBaseConnection.get(srcConfig.getMetadataUrl()).getTable(srcConfig.getMetadataUrlPrefix() + "_acl");
- destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl");
-
- // cube acl
- Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId)));
- if (result.listCells() != null) {
- for (Cell cell : result.listCells()) {
- byte[] family = CellUtil.cloneFamily(cell);
- byte[] column = CellUtil.cloneQualifier(cell);
- byte[] value = CellUtil.cloneValue(cell);
-
- // use the target project uuid as the parent
- if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) {
- String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}";
- value = Bytes.toBytes(valueString);
- }
- Put put = new Put(Bytes.toBytes(cubeId));
- put.add(family, column, value);
- destAclHtable.put(put);
- }
- }
- destAclHtable.flushCommits();
- } finally {
- IOUtils.closeQuietly(srcAclHtable);
- IOUtils.closeQuietly(destAclHtable);
- }
- break;
- }
- case PURGE_AND_DISABLE:{
- String cubeName = (String) opt.params[0];
- String cubeResPath = CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = srcStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer);
- cube.getSegments().clear();
- cube.setStatus(RealizationStatusEnum.DISABLED);
- srcStore.putResource(cubeResPath, cube, cubeSerializer);
- logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl());
- }
- }
- }
-
- private static void undo(Opt opt) throws IOException, InterruptedException {
- logger.info("Undo operation: " + opt.toString());
-
- switch (opt.type) {
- case CHANGE_HTABLE_HOST: {
- String tableName = (String) opt.params[0];
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
- desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
- break;
- }
- case COPY_FILE_IN_META: {
- // no harm
- logger.info("Undo for COPY_FILE_IN_META is ignored");
- break;
- }
- case COPY_DICT_OR_SNAPSHOT: {
- // no harm
- logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
- break;
- }
- case RENAME_FOLDER_IN_HDFS: {
- String srcPath = (String) opt.params[1];
- String dstPath = (String) opt.params[0];
-
- if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new Path(dstPath))) {
- hdfsFS.rename(new Path(srcPath), new Path(dstPath));
- logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
- }
- break;
- }
- case ADD_INTO_PROJECT: {
- logger.info("Undo for ADD_INTO_PROJECT is ignored");
- break;
- }
- case COPY_ACL: {
- String cubeId = (String) opt.params[0];
- String modelId = (String) opt.params[1];
- HTableInterface destAclHtable = null;
- try {
- destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl");
-
- destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
- destAclHtable.delete(new Delete(Bytes.toBytes(modelId)));
- destAclHtable.flushCommits();
- } finally {
- IOUtils.closeQuietly(destAclHtable);
- }
- break;
- }
- case PURGE_AND_DISABLE: {
- logger.info("Undo for PURGE_AND_DISABLE is not supported");
- break;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java b/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java
deleted file mode 100644
index 3a2a88c..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-
-import org.apache.commons.httpclient.ConnectTimeoutException;
-import org.apache.commons.httpclient.HttpClientError;
-import org.apache.commons.httpclient.params.HttpConnectionParams;
-import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory;
-import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- *
- */
-public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory {
- /** Log object for this class. */
- private static Logger LOG = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class);
- private SSLContext sslcontext = null;
-
- /**
- * Constructor for DefaultSslProtocolSocketFactory.
- */
- public DefaultSslProtocolSocketFactory() {
- super();
- }
-
- /**
- * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int)
- */
- public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException {
- return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort);
- }
-
- /**
- * Attempts to get a new socket connection to the given host within the
- * given time limit.
- *
- * <p>
- * To circumvent the limitations of older JREs that do not support connect
- * timeout a controller thread is executed. The controller thread attempts
- * to create a new socket within the given limit of time. If socket
- * constructor does not return until the timeout expires, the controller
- * terminates and throws an {@link ConnectTimeoutException}
- * </p>
- *
- * @param host
- * the host name/IP
- * @param port
- * the port on the host
- * @param localAddress
- * the local host name/IP to bind the socket to
- * @param localPort
- * the port on the local machine
- * @param params
- * {@link HttpConnectionParams Http connection parameters}
- *
- * @return Socket a new socket
- *
- * @throws IOException
- * if an I/O error occurs while creating the socket
- * @throws UnknownHostException
- * if the IP address of the host cannot be determined
- * @throws ConnectTimeoutException
- * DOCUMENT ME!
- * @throws IllegalArgumentException
- * DOCUMENT ME!
- */
- public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException {
- if (params == null) {
- throw new IllegalArgumentException("Parameters may not be null");
- }
-
- int timeout = params.getConnectionTimeout();
-
- if (timeout == 0) {
- return createSocket(host, port, localAddress, localPort);
- } else {
- // To be eventually deprecated when migrated to Java 1.4 or above
- return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout);
- }
- }
-
- /**
- * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int)
- */
- public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
- return getSSLContext().getSocketFactory().createSocket(host, port);
- }
-
- /**
- * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean)
- */
- public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException {
- return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose);
- }
-
- public boolean equals(Object obj) {
- return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class));
- }
-
- public int hashCode() {
- return DefaultX509TrustManager.class.hashCode();
- }
-
- private static SSLContext createEasySSLContext() {
- try {
- SSLContext context = SSLContext.getInstance("TLS");
- context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null);
-
- return context;
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new HttpClientError(e.toString());
- }
- }
-
- private SSLContext getSSLContext() {
- if (this.sslcontext == null) {
- this.sslcontext = createEasySSLContext();
- }
-
- return this.sslcontext;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java b/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java
deleted file mode 100644
index bd28245..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509TrustManager;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- *
- */
-public class DefaultX509TrustManager implements X509TrustManager {
-
- /** Log object for this class. */
- private static Logger LOG = LoggerFactory.getLogger(DefaultX509TrustManager.class);
- private X509TrustManager standardTrustManager = null;
-
- /**
- * Constructor for DefaultX509TrustManager.
- *
- */
- public DefaultX509TrustManager(KeyStore keystore) throws NoSuchAlgorithmException, KeyStoreException {
- super();
-
- TrustManagerFactory factory = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- factory.init(keystore);
-
- TrustManager[] trustmanagers = factory.getTrustManagers();
-
- if (trustmanagers.length == 0) {
- throw new NoSuchAlgorithmException("SunX509 trust manager not supported");
- }
-
- this.standardTrustManager = (X509TrustManager) trustmanagers[0];
- }
-
- public X509Certificate[] getAcceptedIssuers() {
- return this.standardTrustManager.getAcceptedIssuers();
- }
-
- public boolean isClientTrusted(X509Certificate[] certificates) {
- return true;
- // return this.standardTrustManager.isClientTrusted(certificates);
- }
-
- public boolean isServerTrusted(X509Certificate[] certificates) {
- if ((certificates != null) && LOG.isDebugEnabled()) {
- LOG.debug("Server certificate chain:");
-
- for (int i = 0; i < certificates.length; i++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("X509Certificate[" + i + "]=" + certificates[i]);
- }
- }
- }
-
- if ((certificates != null) && (certificates.length == 1)) {
- X509Certificate certificate = certificates[0];
-
- try {
- certificate.checkValidity();
- } catch (CertificateException e) {
- LOG.error(e.toString());
-
- return false;
- }
-
- return true;
- } else {
- return true;
- // return this.standardTrustManager.isServerTrusted(certificates);
- }
- }
-
- @Override
- public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
deleted file mode 100644
index bf655a0..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class DeployCoprocessorCLI {
-
- public static final String CubeObserverClassV2 = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
- public static final String CubeEndpointClassV2 = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
- public static final String IIEndpointClassV2 = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
- public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
- public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
- private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
-
- public static void main(String[] args) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
- HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
-
- String localCoprocessorJar = new File(args[0]).getAbsolutePath();
- logger.info("Identify coprocessor jar " + localCoprocessorJar);
-
- List<String> tableNames = getHTableNames(kylinConfig);
- logger.info("Identify tables " + tableNames);
-
- if (args.length <= 1) {
- printUsageAndExit();
- }
-
- String filterType = args[1].toLowerCase();
- if (filterType.equals("-table")) {
- tableNames = filterByTables(tableNames, Arrays.asList(args).subList(2, args.length));
- } else if (filterType.equals("-cube")) {
- tableNames = filterByCubes(tableNames, Arrays.asList(args).subList(2, args.length));
- } else if (!filterType.equals("all")) {
- printUsageAndExit();
- }
-
- logger.info("Will execute tables " + tableNames);
-
- Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
- logger.info("Old coprocessor jar: " + oldJarPaths);
-
- Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
- logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
-
- List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
-
- // Don't remove old jars, missing coprocessor jar will fail hbase
- // removeOldJars(oldJarPaths, fileSystem);
-
- hbaseAdmin.close();
-
- logger.info("Processed " + processedTables);
- logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
- }
-
- private static void printUsageAndExit() {
- logger.warn("Probe run, exiting.");
- logger.info("Usage: bin/kylin.sh org.apache.kylin.job.tools.DeployCoprocessorCLI JAR_FILE all|-cube CUBE1 CUBE2|-table TABLE1 TABLE2");
- System.exit(0);
- }
-
- private static List<String> filterByCubes(List<String> allTableNames, List<String> cubeNames) {
- CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- List<String> result = Lists.newArrayList();
- for (String c : cubeNames) {
- c = c.trim();
- if (c.endsWith(","))
- c = c.substring(0, c.length() - 1);
-
- CubeInstance cubeInstance = cubeManager.getCube(c);
- for (CubeSegment segment : cubeInstance.getSegments()) {
- String tableName = segment.getStorageLocationIdentifier();
- if (allTableNames.contains(tableName)) {
- result.add(tableName);
- }
- }
- }
- return result;
- }
-
- private static List<String> filterByTables(List<String> allTableNames, List<String> tableNames) {
- List<String> result = Lists.newArrayList();
- for (String t : tableNames) {
- t = t.trim();
- if (t.endsWith(","))
- t = t.substring(0, t.length() - 1);
-
- if (allTableNames.contains(t)) {
- result.add(t);
- }
- }
- return result;
- }
-
- public static void deployCoprocessor(HTableDescriptor tableDesc) {
- try {
- initHTableCoprocessor(tableDesc);
- logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
-
- } catch (Exception ex) {
- logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
- logger.error("Will try creating the table without coprocessor.");
- }
- }
-
- private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
- Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
-
- DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- }
-
- public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Add coprocessor on " + desc.getNameAsString());
- desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
- desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
- }
-
- public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Disable " + tableName);
- hbaseAdmin.disableTable(tableName);
-
- logger.info("Unset coprocessor on " + tableName);
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-
- // remove coprocessors of 1.x version
- while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
- desc.removeCoprocessor(OBSERVER_CLS_NAME);
- }
- while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
- desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
- }
- // remove coprocessors of 2.x version
- while (desc.hasCoprocessor(CubeObserverClassV2)) {
- desc.removeCoprocessor(CubeObserverClassV2);
- }
- while (desc.hasCoprocessor(CubeEndpointClassV2)) {
- desc.removeCoprocessor(CubeEndpointClassV2);
- }
- while (desc.hasCoprocessor(IIEndpointClassV2)) {
- desc.removeCoprocessor(IIEndpointClassV2);
- }
-
- addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- hbaseAdmin.modifyTable(tableName, desc);
-
- logger.info("Enable " + tableName);
- hbaseAdmin.enableTable(tableName);
- }
-
- private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
- List<String> processed = new ArrayList<String>();
-
- for (String tableName : tableNames) {
- try {
- resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
- processed.add(tableName);
- } catch (IOException ex) {
- logger.error("Error processing " + tableName, ex);
- }
- }
- return processed;
- }
-
- public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
- FileStatus newestJar = null;
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (fileStatus.getPath().toString().endsWith(".jar")) {
- if (newestJar == null) {
- newestJar = fileStatus;
- } else {
- if (newestJar.getModificationTime() < fileStatus.getModificationTime())
- newestJar = fileStatus;
- }
- }
- }
- if (newestJar == null)
- return null;
-
- Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
- logger.info("The newest coprocessor is " + path.toString());
- return path;
- }
-
- public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
- Path uploadPath = null;
- File localCoprocessorFile = new File(localCoprocessorJar);
-
- // check existing jars
- if (oldJarPaths == null) {
- oldJarPaths = new HashSet<String>();
- }
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
- uploadPath = fileStatus.getPath();
- break;
- }
- String filename = fileStatus.getPath().toString();
- if (filename.endsWith(".jar")) {
- oldJarPaths.add(filename);
- }
- }
-
- // upload if not existing
- if (uploadPath == null) {
- // figure out a unique new jar file name
- Set<String> oldJarNames = new HashSet<String>();
- for (String path : oldJarPaths) {
- oldJarNames.add(new Path(path).getName());
- }
- String baseName = getBaseFileName(localCoprocessorJar);
- String newName = null;
- int i = 0;
- while (newName == null) {
- newName = baseName + "-" + (i++) + ".jar";
- if (oldJarNames.contains(newName))
- newName = null;
- }
-
- // upload
- uploadPath = new Path(coprocessorDir, newName);
- FileInputStream in = null;
- FSDataOutputStream out = null;
- try {
- in = new FileInputStream(localCoprocessorFile);
- out = fileSystem.create(uploadPath);
- IOUtils.copy(in, out);
- } finally {
- IOUtils.closeQuietly(in);
- IOUtils.closeQuietly(out);
- }
-
- fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
-
- }
-
- uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
- return uploadPath;
- }
-
- private static String getBaseFileName(String localCoprocessorJar) {
- File localJar = new File(localCoprocessorJar);
- String baseName = localJar.getName();
- if (baseName.endsWith(".jar"))
- baseName = baseName.substring(0, baseName.length() - ".jar".length());
- return baseName;
- }
-
- private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
- String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
- Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
- fileSystem.mkdirs(coprocessorDir);
- return coprocessorDir;
- }
-
- private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
- HashSet<String> result = new HashSet<String>();
-
- for (String tableName : tableNames) {
- HTableDescriptor tableDescriptor = null;
- try {
- tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- } catch (TableNotFoundException e) {
- logger.warn("Table not found " + tableName, e);
- continue;
- }
-
- Matcher keyMatcher;
- Matcher valueMatcher;
- for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
- keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
- if (!keyMatcher.matches()) {
- continue;
- }
- valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
- if (!valueMatcher.matches()) {
- continue;
- }
-
- String jarPath = valueMatcher.group(1).trim();
- String clsName = valueMatcher.group(2).trim();
-
- if (OBSERVER_CLS_NAME.equals(clsName)) {
- result.add(jarPath);
- }
- }
- }
-
- return result;
- }
-
- private static List<String> getHTableNames(KylinConfig config) {
- CubeManager cubeMgr = CubeManager.getInstance(config);
-
- ArrayList<String> result = new ArrayList<String>();
- for (CubeInstance cube : cubeMgr.listAllCubes()) {
- for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
- String tableName = seg.getStorageLocationIdentifier();
- if (StringUtils.isBlank(tableName) == false) {
- result.add(tableName);
- System.out.println("added new table: " + tableName);
- }
- }
- }
-
- for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
- for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
- String tableName = seg.getStorageLocationIdentifier();
- if (StringUtils.isBlank(tableName) == false) {
- result.add(tableName);
- System.out.println("added new table: " + tableName);
- }
- }
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
deleted file mode 100644
index 70e1df6..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
-
-import com.google.common.collect.Lists;
-
-public class GridTableHBaseBenchmark {
-
- private static final String TEST_TABLE = "GridTableTest";
- private static final byte[] CF = "F".getBytes();
- private static final byte[] QN = "C".getBytes();
- private static final int N_ROWS = 10000;
- private static final int CELL_SIZE = 128 * 1024; // 128 KB
- private static final double DFT_HIT_RATIO = 0.3;
- private static final double DFT_INDEX_RATIO = 0.1;
- private static final int ROUND = 3;
-
- public static void main(String[] args) throws IOException {
- double hitRatio = DFT_HIT_RATIO;
- try {
- hitRatio = Double.parseDouble(args[0]);
- } catch (Exception e) {
- // nevermind
- }
-
- double indexRatio = DFT_INDEX_RATIO;
- try {
- indexRatio = Double.parseDouble(args[1]);
- } catch (Exception e) {
- // nevermind
- }
-
- testGridTable(hitRatio, indexRatio);
- }
-
- public static void testGridTable(double hitRatio, double indexRatio) throws IOException {
- System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio);
- String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
-
- HConnection conn = HBaseConnection.get(hbaseUrl);
- createHTableIfNeeded(conn, TEST_TABLE);
- prepareData(conn);
-
- Hits hits = new Hits(N_ROWS, hitRatio, indexRatio);
-
- for (int i = 0; i < ROUND; i++) {
- System.out.println("==================================== ROUND " + (i + 1) + " ========================================");
- testRowScanWithIndex(conn, hits.getHitsForRowScanWithIndex());
- testRowScanNoIndexFullScan(conn, hits.getHitsForRowScanNoIndex());
- testRowScanNoIndexSkipScan(conn, hits.getHitsForRowScanNoIndex());
- testColumnScan(conn, hits.getHitsForColumnScan());
- }
-
- }
-
- private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
- Stats stats = new Stats("COLUMN_SCAN");
-
- HTableInterface table = conn.getTable(TEST_TABLE);
- try {
- stats.markStart();
-
- int nLogicCols = colScans.size();
- int nLogicRows = colScans.get(0).getSecond() - colScans.get(0).getFirst();
-
- Scan[] scans = new Scan[nLogicCols];
- ResultScanner[] scanners = new ResultScanner[nLogicCols];
- for (int i = 0; i < nLogicCols; i++) {
- scans[i] = new Scan();
- scans[i].addFamily(CF);
- scanners[i] = table.getScanner(scans[i]);
- }
- for (int i = 0; i < nLogicRows; i++) {
- for (int c = 0; c < nLogicCols; c++) {
- Result r = scanners[c].next();
- stats.consume(r);
- }
- dot(i, nLogicRows);
- }
-
- stats.markEnd();
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException {
- fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
- }
-
- private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException {
- jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
- }
-
- private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException {
- jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
- }
-
- private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
- HTableInterface table = conn.getTable(TEST_TABLE);
- try {
- stats.markStart();
-
- Scan scan = new Scan();
- scan.addFamily(CF);
- ResultScanner scanner = table.getScanner(scan);
- int i = 0;
- for (Result r : scanner) {
- if (hits[i])
- stats.consume(r);
- dot(i, N_ROWS);
- i++;
- }
-
- stats.markEnd();
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
-
- final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience
-
- HTableInterface table = conn.getTable(TEST_TABLE);
- try {
-
- stats.markStart();
-
- int i = 0;
- while (i < N_ROWS) {
- int start, end;
- for (start = i; start < N_ROWS; start++) {
- if (hits[start])
- break;
- }
- for (end = start + 1; end < N_ROWS; end++) {
- boolean isEnd = true;
- for (int j = 0; j < jumpThreshold && end + j < N_ROWS; j++)
- if (hits[end + j])
- isEnd = false;
- if (isEnd)
- break;
- }
-
- if (start < N_ROWS) {
- Scan scan = new Scan();
- scan.setStartRow(Bytes.toBytes(start));
- scan.setStopRow(Bytes.toBytes(end));
- scan.addFamily(CF);
- ResultScanner scanner = table.getScanner(scan);
- i = start;
- for (Result r : scanner) {
- stats.consume(r);
- dot(i, N_ROWS);
- i++;
- }
- }
- i = end;
- }
-
- stats.markEnd();
-
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- private static void prepareData(HConnection conn) throws IOException {
- HTableInterface table = conn.getTable(TEST_TABLE);
-
- try {
- // check how many rows existing
- int nRows = 0;
- Scan scan = new Scan();
- scan.setFilter(new KeyOnlyFilter());
- ResultScanner scanner = table.getScanner(scan);
- for (Result r : scanner) {
- r.getRow(); // nothing to do
- nRows++;
- }
-
- if (nRows > 0) {
- System.out.println(nRows + " existing rows");
- if (nRows != N_ROWS)
- throw new IOException("Expect " + N_ROWS + " rows but it is not");
- return;
- }
-
- // insert rows into empty table
- System.out.println("Writing " + N_ROWS + " rows to " + TEST_TABLE);
- long nBytes = 0;
- for (int i = 0; i < N_ROWS; i++) {
- byte[] rowkey = Bytes.toBytes(i);
- Put put = new Put(rowkey);
- byte[] cell = randomBytes();
- put.add(CF, QN, cell);
- table.put(put);
- nBytes += cell.length;
- dot(i, N_ROWS);
- }
- System.out.println();
- System.out.println("Written " + N_ROWS + " rows, " + nBytes + " bytes");
-
- } finally {
- IOUtils.closeQuietly(table);
- }
-
- }
-
- private static void dot(int i, int nRows) {
- if (i % (nRows / 100) == 0)
- System.out.print(".");
- }
-
- private static byte[] randomBytes() {
- byte[] bytes = new byte[CELL_SIZE];
- Random rand = new Random();
- rand.nextBytes(bytes);
- return bytes;
- }
-
- private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
-
- try {
- boolean tableExist = false;
- try {
- hbase.getTableDescriptor(TableName.valueOf(tableName));
- tableExist = true;
- } catch (TableNotFoundException e) {
- }
-
- if (tableExist) {
- System.out.println("HTable '" + tableName + "' already exists");
- return;
- }
-
- System.out.println("Creating HTable '" + tableName + "'");
-
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
-
- HColumnDescriptor fd = new HColumnDescriptor(CF);
- fd.setBlocksize(CELL_SIZE);
- desc.addFamily(fd);
- hbase.createTable(desc);
-
- System.out.println("HTable '" + tableName + "' created");
- } finally {
- hbase.close();
- }
- }
-
- static class Hits {
-
- boolean[] hitsForRowScanWithIndex;
- boolean[] hitsForRowScanNoIndex;
- List<Pair<Integer, Integer>> hitsForColumnScan;
-
- public Hits(int nRows, double hitRatio, double indexRatio) {
- Random rand = new Random();
-
- hitsForRowScanWithIndex = new boolean[nRows];
- hitsForRowScanNoIndex = new boolean[nRows];
-
- // for row scan
- int blockSize = (int) (1.0 / indexRatio);
- int nBlocks = nRows / blockSize;
-
- for (int i = 0; i < nBlocks; i++) {
-
- if (rand.nextDouble() < hitRatio) {
- for (int j = 0; j < blockSize; j++) {
- hitsForRowScanNoIndex[i * blockSize + j] = true;
- hitsForRowScanWithIndex[i * blockSize + j] = true;
- }
- } else {
- // case of not hit
- hitsForRowScanNoIndex[i * blockSize] = true;
- }
- }
-
- hitsForColumnScan = Lists.newArrayList();
-
- // for column scan
- int nColumns = 20;
- int logicRows = nRows / nColumns;
- for (int i = 0; i < nColumns; i++) {
- if (rand.nextDouble() < hitRatio) {
- hitsForColumnScan.add(new Pair<Integer, Integer>(i * logicRows, (i + 1) * logicRows));
- }
- }
-
- }
-
- public boolean[] getHitsForRowScanWithIndex() {
- return hitsForRowScanWithIndex;
- }
-
- public boolean[] getHitsForRowScanNoIndex() {
- return hitsForRowScanNoIndex;
- }
-
- public List<Pair<Integer, Integer>> getHitsForColumnScan() {
- return hitsForColumnScan;
- }
- }
-
- static class Stats {
- String name;
- long startTime;
- long endTime;
- long rowsRead;
- long bytesRead;
-
- public Stats(String name) {
- this.name = name;
- }
-
- public void consume(Result r) {
- consume(r, Integer.MAX_VALUE);
- }
-
- private void consume(Result r, int nBytesToConsume) {
- Cell cell = r.getColumnLatestCell(CF, QN);
- byte mix = 0;
- byte[] valueArray = cell.getValueArray();
- int n = Math.min(nBytesToConsume, cell.getValueLength());
- for (int i = 0; i < n; i++) {
- mix ^= valueArray[i];
- bytesRead++;
- }
- discard(mix);
- rowsRead++;
- }
-
- private void discard(byte n) {
- // do nothing
- }
-
- public void markStart() {
- System.out.println(name + " starts");
- startTime = System.currentTimeMillis();
- }
-
- public void markEnd() {
- endTime = System.currentTimeMillis();
- System.out.println();
- System.out.println(name + " ends, " + (endTime - startTime) + " ms, " + rowsRead + " rows read, " + bytesRead + " bytes read");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java
deleted file mode 100644
index 6d741aa..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- *
- */
-public class HadoopStatusChecker {
-
- protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusChecker.class);
-
- private final String yarnUrl;
- private final String mrJobID;
- private final StringBuilder output;
- private final KylinConfig config;
-
- public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output, KylinConfig config) {
- this.yarnUrl = yarnUrl;
- this.mrJobID = mrJobID;
- this.output = output;
- this.config = config;
- }
-
- public JobStepStatusEnum checkStatus() {
- if (null == mrJobID) {
- this.output.append("Skip status check with empty job id..\n");
- return JobStepStatusEnum.WAITING;
- }
- JobStepStatusEnum status = null;
- try {
- boolean useKerberosAuth = config.getKylinUseKerberosAuth();
- final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID).get(useKerberosAuth);
- logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight());
- output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n");
-
- switch (result.getRight()) {
- case SUCCEEDED:
- status = JobStepStatusEnum.FINISHED;
- break;
- case FAILED:
- status = JobStepStatusEnum.ERROR;
- break;
- case KILLED:
- status = JobStepStatusEnum.KILLED;
- break;
- case UNDEFINED:
- switch (result.getLeft()) {
- case NEW:
- case NEW_SAVING:
- case SUBMITTED:
- case ACCEPTED:
- status = JobStepStatusEnum.WAITING;
- break;
- case RUNNING:
- status = JobStepStatusEnum.RUNNING;
- break;
- case FINAL_SAVING:
- case FINISHING:
- case FINISHED:
- case FAILED:
- case KILLING:
- case KILLED:
- }
- break;
- }
- } catch (Exception e) {
- logger.error("error check status", e);
- output.append("Exception: " + e.getLocalizedMessage() + "\n");
- status = JobStepStatusEnum.ERROR;
- }
-
- return status;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java
deleted file mode 100644
index 9035ad4..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.Principal;
-
-import org.apache.commons.httpclient.Header;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpMethod;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.protocol.Protocol;
-import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.http.auth.AuthSchemeProvider;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.Credentials;
-import org.apache.http.client.config.AuthSchemes;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.protocol.HttpClientContext;
-import org.apache.http.config.Lookup;
-import org.apache.http.config.RegistryBuilder;
-import org.apache.http.impl.auth.SPNegoSchemeFactory;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by qianzhou on 1/20/15.
- */
-public class HadoopStatusGetter {
-
- private final String mrJobId;
- private final String yarnUrl;
-
- protected static final Logger log = LoggerFactory.getLogger(HadoopStatusChecker.class);
-
- public HadoopStatusGetter(String yarnUrl, String mrJobId) {
- this.yarnUrl = yarnUrl;
- this.mrJobId = mrJobId;
- }
-
- public Pair<RMAppState, FinalApplicationStatus> get(boolean useKerberos) throws IOException {
- String applicationId = mrJobId.replace("job", "application");
- String url = yarnUrl.replace("${job_id}", applicationId);
- String response = useKerberos ? getHttpResponseWithKerberosAuth(url) : getHttpResponse(url);
- JsonNode root = new ObjectMapper().readTree(response);
- RMAppState state = RMAppState.valueOf(root.findValue("state").getTextValue());
- FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue());
- return Pair.of(state, finalStatus);
- }
-
- private String getHttpResponse(String url) throws IOException {
- HttpClient client = new HttpClient();
-
- String response = null;
- while (response == null) { // follow redirects via 'refresh'
- if (url.startsWith("https://")) {
- registerEasyHttps();
- }
- if (url.contains("anonymous=true") == false) {
- url += url.contains("?") ? "&" : "?";
- url += "anonymous=true";
- }
-
- HttpMethod get = new GetMethod(url);
- get.addRequestHeader("accept", "application/json");
-
- try {
- client.executeMethod(get);
-
- String redirect = null;
- Header h = get.getResponseHeader("Location");
- if (h != null) {
- redirect = h.getValue();
- if (isValidURL(redirect) == false) {
- log.info("Get invalid redirect url, skip it: " + redirect);
- Thread.sleep(1000l);
- continue;
- }
- } else {
- h = get.getResponseHeader("Refresh");
- if (h != null) {
- String s = h.getValue();
- int cut = s.indexOf("url=");
- if (cut >= 0) {
- redirect = s.substring(cut + 4);
-
- if (isValidURL(redirect) == false) {
- log.info("Get invalid redirect url, skip it: " + redirect);
- Thread.sleep(1000l);
- continue;
- }
- }
- }
- }
-
- if (redirect == null) {
- response = get.getResponseBodyAsString();
- log.debug("Job " + mrJobId + " get status check result.\n");
- } else {
- url = redirect;
- log.debug("Job " + mrJobId + " check redirect url " + url + ".\n");
- }
- } catch (InterruptedException e) {
- log.error(e.getMessage());
- } finally {
- get.releaseConnection();
- }
- }
-
- return response;
- }
-
- private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf";
- private String getHttpResponseWithKerberosAuth(String url) throws IOException {
-
- // referred from https://stackoverflow.com/questions/24633380/how-do-i-authenticate-with-spnego-kerberos-and-apaches-httpclient
- String krb5ConfigPath = System.getProperty("java.security.krb5.conf");
- if (krb5ConfigPath == null) {
- krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION;
- }
- log.debug("krb5 config file is " + krb5ConfigPath);
-
- boolean skipPortAtKerberosDatabaseLookup = true;
- System.setProperty("java.security.krb5.conf", krb5ConfigPath);
- System.setProperty("sun.security.krb5.debug", "true");
- System.setProperty("javax.security.auth.useSubjectCredsOnly","false");
- Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
- .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup))
- .build();
-
- CloseableHttpClient client = HttpClients.custom().setDefaultAuthSchemeRegistry(authSchemeRegistry).build();
- HttpClientContext context = HttpClientContext.create();
- BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- // This may seem odd, but specifying 'null' as principal tells java to use the logged in user's credentials
- Credentials useJaasCreds = new Credentials() {
- public String getPassword() {
- return null;
- }
- public Principal getUserPrincipal() {
- return null;
- }
- };
- credentialsProvider.setCredentials( new AuthScope(null, -1, null), useJaasCreds );
- context.setCredentialsProvider(credentialsProvider);
- String responseString = null;
- int count = 0;
- int MAX_RETRY_TIME = 3;
- while(responseString == null && count ++ < MAX_RETRY_TIME) {
- if (url.startsWith("https://")) {
- registerEasyHttps();
- }
- if (url.contains("anonymous=true") == false) {
- url += url.contains("?") ? "&" : "?";
- url += "anonymous=true";
- }
- HttpGet httpget = new HttpGet(url);
- try {
- httpget.addHeader("accept", "application/json");
- CloseableHttpResponse response = client.execute(httpget,context);
- String redirect = null;
- org.apache.http.Header h = response.getFirstHeader("Location");
- if (h != null) {
- redirect = h.getValue();
- if (isValidURL(redirect) == false) {
- log.info("Get invalid redirect url, skip it: " + redirect);
- Thread.sleep(1000l);
- continue;
- }
- } else {
- h = response.getFirstHeader("Refresh");
- if (h != null) {
- String s = h.getValue();
- int cut = s.indexOf("url=");
- if (cut >= 0) {
- redirect = s.substring(cut + 4);
-
- if (isValidURL(redirect) == false) {
- log.info("Get invalid redirect url, skip it: " + redirect);
- Thread.sleep(1000l);
- continue;
- }
- }
- }
- }
-
- if (redirect == null) {
- responseString = IOUtils.toString(response.getEntity().getContent());
- log.debug("Job " + mrJobId + " get status check result.\n");
- } else {
- url = redirect;
- log.debug("Job " + mrJobId + " check redirect url " + url + ".\n");
- }
- } catch (InterruptedException e) {
- log.error(e.getMessage());
- } finally {
- httpget.releaseConnection();
- }
- }
-
- return responseString;
- }
-
- private static Protocol EASY_HTTPS = null;
-
- private static void registerEasyHttps() {
- // by pass all https issue
- if (EASY_HTTPS == null) {
- EASY_HTTPS = new Protocol("https", (ProtocolSocketFactory) new DefaultSslProtocolSocketFactory(), 443);
- Protocol.registerProtocol("https", EASY_HTTPS);
- }
- }
-
- private static boolean isValidURL(String value) {
- if (StringUtils.isNotEmpty(value)) {
- java.net.URL url;
- try {
- url = new java.net.URL(value);
- } catch (MalformedURLException var5) {
- return false;
- }
-
- return StringUtils.isNotEmpty(url.getProtocol()) && StringUtils.isNotEmpty(url.getHost());
- }
-
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
deleted file mode 100644
index 53930e3..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by honma on 11/11/14.
- */
-@SuppressWarnings("static-access")
-public class HtableAlterMetadataCLI extends AbstractHadoopJob {
-
- private static final Option OPTION_METADATA_KEY = OptionBuilder.withArgName("key").hasArg().isRequired(true).withDescription("The metadata key").create("key");
- private static final Option OPTION_METADATA_VALUE = OptionBuilder.withArgName("value").hasArg().isRequired(true).withDescription("The metadata value").create("value");
-
- protected static final Logger log = LoggerFactory.getLogger(HtableAlterMetadataCLI.class);
-
- String tableName;
- String metadataKey;
- String metadataValue;
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
- try {
- options.addOption(OPTION_HTABLE_NAME);
- options.addOption(OPTION_METADATA_KEY);
- options.addOption(OPTION_METADATA_VALUE);
-
- parseOptions(options, args);
- tableName = getOptionValue(OPTION_HTABLE_NAME);
- metadataKey = getOptionValue(OPTION_METADATA_KEY);
- metadataValue = getOptionValue(OPTION_METADATA_VALUE);
-
- alter();
-
- return 0;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- private void alter() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
- HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-
- hbaseAdmin.disableTable(table.getTableName());
- table.setValue(metadataKey, metadataValue);
- hbaseAdmin.modifyTable(table.getTableName(), table);
- hbaseAdmin.enableTable(table.getTableName());
- hbaseAdmin.close();
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new HtableAlterMetadataCLI(), args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/OptionsHelper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/OptionsHelper.java b/job/src/main/java/org/apache/kylin/job/tools/OptionsHelper.java
deleted file mode 100644
index 5ed5b35..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/OptionsHelper.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.File;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class OptionsHelper {
- private CommandLine commandLine;
-
- public void parseOptions(Options options, String[] args) throws ParseException {
- CommandLineParser parser = new GnuParser();
- commandLine = parser.parse(options, args);
- }
-
- public Option[] getOptions() {
- return commandLine.getOptions();
- }
-
- public String getOptionsAsString() {
- StringBuilder buf = new StringBuilder();
- for (Option option : commandLine.getOptions()) {
- buf.append(" ");
- buf.append(option.getOpt());
- if (option.hasArg()) {
- buf.append("=");
- buf.append(option.getValue());
- }
- }
- return buf.toString();
- }
-
- public String getOptionValue(Option option) {
- return commandLine.getOptionValue(option.getOpt());
- }
-
- public boolean hasOption(Option option) {
- return commandLine.hasOption(option.getOpt());
- }
-
- public void printUsage(String programName, Options options) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(programName, options);
- }
-
- public static String convertToFileURL(String path) {
- if (File.separatorChar != '/') {
- path = path.replace(File.separatorChar, '/');
- }
-
- return path;
- }
-
-}