You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/03 04:31:00 UTC
[13/25] kylin git commit: KYLIN-1311 write rest servers to file
KYLIN-1311 write rest servers to file
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/12c3cc6f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/12c3cc6f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/12c3cc6f
Branch: refs/heads/helix-rebase
Commit: 12c3cc6f01e75c894987d3345039c3d3bf760a88
Parents: 76132a5
Author: shaofengshi <sh...@apache.org>
Authored: Sun Jan 24 21:41:56 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Mar 2 17:27:46 2016 +0800
----------------------------------------------------------------------
build/bin/streaming_build.sh | 9 ++++--
build/bin/streaming_fillgap.sh | 8 +++--
.../org/apache/kylin/common/KylinConfig.java | 31 ++++++++++++++++++++
.../kylin/rest/helix/HelixClusterAdmin.java | 27 ++++++++++-------
4 files changed, 58 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/12c3cc6f/build/bin/streaming_build.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh
index a96ecc1..20b87d8 100644
--- a/build/bin/streaming_build.sh
+++ b/build/bin/streaming_build.sh
@@ -20,14 +20,17 @@
source /etc/profile
source ~/.bash_profile
-STREAMING=$1
+CUBE_NAME=$1
INTERVAL=$2
DELAY=$3
+MARGIN=$4
+AUTHORIZATION=$5
+KYLIN_HOST=$6
CURRENT_TIME_IN_SECOND=`date +%s`
CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000))
START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY))
END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL))
ID="$START"_"$END"
-echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -start ${START} -end ${END} -streaming ${STREAMING}
\ No newline at end of file
+echo "building for ${CUBE_NAME} ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
+curl --request PUT --data "{\"start\": $START, \"end\": $END }" --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/build
http://git-wip-us.apache.org/repos/asf/kylin/blob/12c3cc6f/build/bin/streaming_fillgap.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh
index 74d9037..31c4886 100644
--- a/build/bin/streaming_fillgap.sh
+++ b/build/bin/streaming_fillgap.sh
@@ -20,8 +20,10 @@
source /etc/profile
source ~/.bash_profile
-streaming=$1
-margin=$2
+CUBE_NAME=$1
+AUTHORIZATION=$2
+KYLIN_HOST=$3
cd ${KYLIN_HOME}
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin}
\ No newline at end of file
+#sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin}
+curl --request PUT --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/fillgap
http://git-wip-us.apache.org/repos/asf/kylin/blob/12c3cc6f/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 81f5827..08fb6dd 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -265,4 +265,35 @@ public class KylinConfig extends KylinConfigBase {
}
}
+ public static void writeOverrideProperties(Properties properties) throws IOException {
+ File propFile = getKylinProperties();
+ File overrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
+ overrideFile.createNewFile();
+ FileInputStream fis2 = null;
+ Properties override = new Properties();
+ try {
+ fis2 = new FileInputStream(overrideFile);
+ override.load(fis2);
+ for (Map.Entry<Object, Object> entries : properties.entrySet()) {
+ override.setProperty(entries.getKey().toString(), entries.getValue().toString());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ IOUtils.closeQuietly(fis2);
+ }
+
+ PrintWriter pw = null;
+ try {
+ pw = new PrintWriter(overrideFile);
+ for (Enumeration e = override.propertyNames(); e.hasMoreElements();) {
+ String key = (String) e.nextElement();
+ pw.println(key + "=" + override.getProperty(key));
+ }
+ pw.close();
+ } finally {
+ IOUtils.closeQuietly(pw);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/12c3cc6f/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 0758ef1..4da9a86 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -40,8 +40,10 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@@ -101,7 +103,7 @@ public class HelixClusterAdmin {
addInstance(instanceName, instanceTags);
startInstance(instanceName);
- rebalanceWithTag(instanceTags);
+ rebalanceWithTag(RESOURCE_NAME_JOB_ENGINE, TAG_JOB_ENGINE);
boolean startController = kylinConfig.isClusterController();
if (startController) {
@@ -123,7 +125,7 @@ public class HelixClusterAdmin {
// add job engine as a resource, 1 partition
if (!admin.getResourcesInCluster(clusterName).contains(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE)) {
- admin.addResource(clusterName, HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
+ admin.addResource(clusterName, HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name());
}
}
@@ -134,8 +136,8 @@ public class HelixClusterAdmin {
logger.warn("Resource '" + resourceName + "' already exists in cluster, remove and re-add.");
admin.dropResource(clusterName, resourceName);
}
- admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
- admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER);
+ admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name());
+ rebalanceWithTag(resourceName, TAG_STREAM_BUILDER);
}
@@ -161,13 +163,9 @@ public class HelixClusterAdmin {
* Rebalance the resource with the tags
* @param tags
*/
- protected void rebalanceWithTag(List<String> tags) {
- for (String tag : tags) {
- if (tag.equals(TAG_JOB_ENGINE)) {
- List<String> instances = admin.getInstancesInClusterWithTag(clusterName, TAG_JOB_ENGINE);
- admin.rebalance(clusterName, RESOURCE_NAME_JOB_ENGINE, instances.size(), "", tag);
- }
- }
+ protected void rebalanceWithTag(String resourceName, String tag) {
+ List<String> instances = admin.getInstancesInClusterWithTag(clusterName, tag);
+ admin.rebalance(clusterName, resourceName, instances.size(), "", tag);
}
/**
@@ -277,6 +275,13 @@ public class HelixClusterAdmin {
kylinConfig.setProperty("kylin.rest.servers", restServersInCluster);
System.setProperty("kylin.rest.servers", restServersInCluster);
logger.info("kylin.rest.servers update to " + restServersInCluster);
+ Properties properties = new Properties();
+ properties.setProperty("kylin.rest.servers", restServersInCluster);
+ try {
+ KylinConfig.writeOverrideProperties(properties);
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
Broadcaster.clearCache();
}
}