You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by zh...@apache.org on 2015/10/22 12:06:56 UTC
[4/5] incubator-kylin git commit: KYLIN-1041, Streaming UI
KYLIN-1041, Streaming UI
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/53b383d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/53b383d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/53b383d9
Branch: refs/heads/2.x-staging
Commit: 53b383d9dc814b9fa53feae2adbbdd99e83be6b1
Parents: a8ead00
Author: jiazhong <ji...@ebay.com>
Authored: Fri Sep 25 17:46:11 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Thu Oct 22 18:05:59 2015 +0800
----------------------------------------------------------------------
.../kylin/common/restclient/Broadcaster.java | 2 +-
.../org/apache/kylin/cube/CubeDescManager.java | 2 +-
.../kylin/engine/streaming/StreamingConfig.java | 9 +
.../engine/streaming/StreamingManager.java | 163 ++-
.../kylin/rest/controller/CubeController.java | 219 +++-
.../kylin/rest/controller/ModelController.java | 2 +-
.../rest/controller/StreamingController.java | 225 ++++
.../apache/kylin/rest/request/CubeRequest.java | 26 +
.../kylin/rest/request/StreamingRequest.java | 41 +
.../apache/kylin/rest/service/BasicService.java | 10 +
.../apache/kylin/rest/service/CacheService.java | 152 ++-
.../kylin/rest/service/KafkaConfigService.java | 94 ++
.../kylin/rest/service/StreamingService.java | 90 ++
server/src/main/resources/kylinSecurity.xml | 2 +
.../kylin/source/kafka/KafkaConfigManager.java | 140 +-
.../kylin/source/kafka/config/KafkaConfig.java | 9 +
webapp/app/index.html | 32 +-
webapp/app/js/app.js | 2 +-
webapp/app/js/controllers/cubeAdvanceSetting.js | 28 +
webapp/app/js/controllers/cubeEdit.js | 1216 ++++++++++--------
webapp/app/js/controllers/cubeFilter.js | 23 -
webapp/app/js/controllers/cubeMeasures.js | 96 ++
webapp/app/js/controllers/cubeSchema.js | 130 +-
webapp/app/js/controllers/cubes.js | 21 +-
webapp/app/js/controllers/modelSchema.js | 429 +++---
webapp/app/js/controllers/sourceMeta.js | 10 +-
webapp/app/js/controllers/streamingConfig.js | 75 ++
.../app/js/controllers/streamingKafkaConfig.js | 23 +
webapp/app/js/model/cubeConfig.js | 3 +-
webapp/app/js/model/cubeListModel.js | 2 +-
webapp/app/js/model/streamingListModel.js | 80 ++
webapp/app/js/model/streamingModel.js | 66 +
webapp/app/js/services/streaming.js | 28 +
webapp/app/partials/cubeDesigner/filter.html | 66 -
webapp/app/partials/cubeDesigner/info.html | 4 +-
.../cubeDesigner/kafkaAdvancedConfig.html | 165 +++
.../partials/cubeDesigner/kafkaBasicConfig.html | 114 ++
webapp/app/partials/cubeDesigner/measures.html | 360 +++---
.../partials/cubeDesigner/streamingConfig.html | 282 ++++
webapp/app/partials/cubes/cube_detail.html | 10 +-
webapp/app/partials/cubes/cubes.html | 7 +
.../modelDesigner/conditions_settings.html | 6 +-
.../app/partials/modelDesigner/data_model.html | 2 +-
.../app/partials/modelDesigner/model_info.html | 2 +-
webapp/app/partials/models/models_tree.html | 4 -
.../app/partials/streaming/streaming_edit.html | 34 +
.../partials/streaming/streaming_schema.html | 63 +
.../app/partials/tables/source_table_tree.html | 2 +
webapp/app/partials/tables/table_detail.html | 113 --
webapp/app/partials/tables/table_load.html | 130 ++
webapp/bower.json | 3 +-
webapp/grunt.json | 2 +
52 files changed, 3473 insertions(+), 1346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index 0b6152f..80ec33c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -138,7 +138,7 @@ public class Broadcaster {
}
public enum TYPE {
- ALL("all"), CUBE("cube"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
+ ALL("all"), CUBE("cube"),STREAMING("streaming"),KAFKA("kafka"),CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
private String text;
TYPE(String text) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index b5e3eb2..8e75d29 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -228,7 +228,7 @@ public class CubeDescManager {
* @return
* @throws IOException
*/
- public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
+ public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
// Validate CubeDesc
if (desc.getUuid() == null || desc.getName() == null) {
throw new IllegalArgumentException();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
index 3b3b4f7..a7c6da0 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
@@ -41,6 +41,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.persistence.Serializer;
@@ -98,6 +99,14 @@ public class StreamingConfig extends RootPersistentEntity {
this.margin = margin;
}
+ public String getResourcePath() {
+ return concatResourcePath(name);
+ }
+
+ public static String concatResourcePath(String streamingName) {
+ return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + streamingName + ".json";
+ }
+
@Override
public StreamingConfig clone() {
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index 0e94b04..112379e 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -40,15 +40,17 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.MetadataConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,10 +70,21 @@ public class StreamingManager {
// static cached instances
private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
+ public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+
private KylinConfig config;
- private StreamingManager(KylinConfig config) {
+ // name ==> StreamingConfig
+ private CaseInsensitiveStringCache<StreamingConfig> streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(Broadcaster.TYPE.STREAMING);
+
+ public static void clearCache() {
+ CACHE.clear();
+ }
+
+ private StreamingManager(KylinConfig config) throws IOException {
this.config = config;
+ reloadAllStreaming();
}
private ResourceStore getStore() {
@@ -89,12 +102,16 @@ public class StreamingManager {
if (r != null) {
return r;
}
+ try{
r = new StreamingManager(config);
CACHE.put(config, r);
if (CACHE.size() > 1) {
- logger.warn("More than one cubemanager singleton exist");
+ logger.warn("More than one streamingManager singleton exist");
}
return r;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
+ }
}
}
@@ -114,32 +131,96 @@ public class StreamingManager {
return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
}
- public boolean createOrUpdateKafkaConfig(String name, StreamingConfig config) {
- try {
- getStore().putResource(formatStreamingConfigPath(name), config, StreamingConfig.SERIALIZER);
- return true;
- } catch (IOException e) {
- logger.error("error save resource name:" + name, e);
- throw new RuntimeException("error save resource name:" + name, e);
- }
- }
public StreamingConfig getStreamingConfig(String name) {
- try {
- return getStore().getResource(formatStreamingConfigPath(name), StreamingConfig.class, StreamingConfig.SERIALIZER);
- } catch (IOException e) {
- logger.error("error get resource name:" + name, e);
- throw new RuntimeException("error get resource name:" + name, e);
+ return streamingMap.get(name);
+ }
+
+ public List<StreamingConfig> listAllStreaming() {
+ return new ArrayList<>(streamingMap.values());
+ }
+
+
+ /**
+ * Reload StreamingConfig from resource store It will be triggered by an desc
+ * update event.
+ *
+ * @param name
+ * @throws IOException
+ */
+ public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
+
+ // Save Source
+ String path = StreamingConfig.concatResourcePath(name);
+
+ // Reload the StreamingConfig
+ StreamingConfig ndesc = loadStreamingConfigAt(path);
+
+ // Here replace the old one
+ streamingMap.putLocal(ndesc.getName(), ndesc);
+ return ndesc;
+ }
+
+ // remove streamingConfig
+ public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+ String path = streamingConfig.getResourcePath();
+ getStore().deleteResource(path);
+ streamingMap.remove(streamingConfig.getName());
+ }
+
+
+ public StreamingConfig getConfig(String name) {
+ name = name.toUpperCase();
+ return streamingMap.get(name);
+ }
+
+ public void removeStreamingLocal(String streamingName) {
+ streamingMap.removeLocal(streamingName);
+ }
+
+ /**
+ * Update CubeDesc with the input. Broadcast the event into cluster
+ *
+ * @param desc
+ * @return
+ * @throws IOException
+ */
+ public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
+ // Validate CubeDesc
+ if (desc.getUuid() == null || desc.getName() == null) {
+ throw new IllegalArgumentException();
+ }
+ String name = desc.getName();
+ if (!streamingMap.containsKey(name)) {
+ throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
}
+
+
+ // Save Source
+ String path = desc.getResourcePath();
+ getStore().putResource(path, desc, STREAMING_SERIALIZER);
+
+ // Reload the StreamingConfig
+ StreamingConfig ndesc = loadStreamingConfigAt(path);
+ // Here replace the old one
+ streamingMap.put(ndesc.getName(), desc);
+
+ return ndesc;
}
- public void saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+
+ public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
throw new IllegalArgumentException();
}
+ if (streamingMap.containsKey(streamingConfig.getName()))
+ throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
+
String path = formatStreamingConfigPath(streamingConfig.getName());
getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
+ streamingMap.put(streamingConfig.getName(),streamingConfig);
+ return streamingConfig;
}
public long getOffset(String streaming, int shard) {
@@ -199,6 +280,46 @@ public class StreamingManager {
}
}
+ private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
+ ResourceStore store = getStore();
+ StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
+
+ if (StringUtils.isBlank(streamingDesc.getName())) {
+ throw new IllegalStateException("StreamingConfig name must not be blank");
+ }
+ return streamingDesc;
+ }
+
+ private void reloadAllStreaming() throws IOException {
+ ResourceStore store = getStore();
+ logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
+
+ streamingMap.clear();
+
+ List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+ for (String path : paths) {
+ StreamingConfig streamingConfig;
+ try {
+ streamingConfig = loadStreamingConfigAt(path);
+ } catch (Exception e) {
+ logger.error("Error loading streaming desc " + path, e);
+ continue;
+ }
+ if (path.equals(streamingConfig.getResourcePath()) == false) {
+ logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
+ continue;
+ }
+ if (streamingMap.containsKey(streamingConfig.getName())) {
+ logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
+ continue;
+ }
+
+ streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
+ }
+
+ logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
+ }
+
private final ObjectMapper mapper = new ObjectMapper();
private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index d61793d..9ce6452 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -34,6 +34,7 @@ import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.model.CubeBuildTypeEnum;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.exception.JobException;
@@ -46,10 +47,14 @@ import org.apache.kylin.rest.exception.NotFoundException;
import org.apache.kylin.rest.request.CubeRequest;
import org.apache.kylin.rest.request.CubeSegmentRequest;
import org.apache.kylin.rest.request.JobBuildRequest;
+import org.apache.kylin.rest.request.StreamingRequest;
import org.apache.kylin.rest.response.GeneralResponse;
import org.apache.kylin.rest.response.HBaseResponse;
import org.apache.kylin.rest.service.CubeService;
import org.apache.kylin.rest.service.JobService;
+import org.apache.kylin.rest.service.KafkaConfigService;
+import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,8 +75,6 @@ import com.fasterxml.jackson.databind.JsonMappingException;
/**
* CubeController is defined as Restful API entrance for UI.
- *
- * @author jianliu
*/
@Controller
@RequestMapping(value = "/cubes")
@@ -79,12 +82,18 @@ public class CubeController extends BasicController {
private static final Logger logger = LoggerFactory.getLogger(CubeController.class);
@Autowired
+ private StreamingService streamingService;
+
+ @Autowired
+ private KafkaConfigService kafkaConfigService;
+
+ @Autowired
private CubeService cubeService;
@Autowired
private JobService jobService;
- @RequestMapping(value = "", method = { RequestMethod.GET })
+ @RequestMapping(value = "", method = {RequestMethod.GET})
@ResponseBody
public List<CubeInstance> getCubes(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "modelName", required = false) String modelName, @RequestParam(value = "projectName", required = false) String projectName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
return cubeService.getCubes(cubeName, projectName, modelName, limit, offset);
@@ -98,7 +107,7 @@ public class CubeController extends BasicController {
* @throws UnknownHostException
* @throws IOException
*/
- @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = { RequestMethod.GET })
+ @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = {RequestMethod.GET})
@ResponseBody
public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) {
CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -120,7 +129,7 @@ public class CubeController extends BasicController {
* @param notifyList
* @throws IOException
*/
- @RequestMapping(value = "/{cubeName}/notify_list", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/{cubeName}/notify_list", method = {RequestMethod.PUT})
@ResponseBody
public void updateNotifyList(@PathVariable String cubeName, @RequestBody List<String> notifyList) {
CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -138,7 +147,7 @@ public class CubeController extends BasicController {
}
- @RequestMapping(value = "/{cubeName}/cost", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/{cubeName}/cost", method = {RequestMethod.PUT})
@ResponseBody
public CubeInstance updateCubeCost(@PathVariable String cubeName, @RequestParam(value = "cost") int cost) {
try {
@@ -150,7 +159,7 @@ public class CubeController extends BasicController {
}
}
- @RequestMapping(value = "/{cubeName}/coprocessor", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/{cubeName}/coprocessor", method = {RequestMethod.PUT})
@ResponseBody
public Map<String, Boolean> updateCubeCoprocessor(@PathVariable String cubeName, @RequestParam(value = "force") String force) {
try {
@@ -168,7 +177,7 @@ public class CubeController extends BasicController {
*
* @throws IOException
*/
- @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {RequestMethod.PUT})
@ResponseBody
public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName, @RequestParam(value = "lookupTable") String lookupTable) {
try {
@@ -186,7 +195,7 @@ public class CubeController extends BasicController {
* @return
* @throws IOException
*/
- @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/{cubeName}/rebuild", method = {RequestMethod.PUT})
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest jobBuildRequest) {
try {
@@ -203,7 +212,7 @@ public class CubeController extends BasicController {
}
}
- @RequestMapping(value = "/{cubeName}/disable", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/{cubeName}/disable", method = {RequestMethod.PUT})
@ResponseBody
public CubeInstance disableCube(@PathVariable String cubeName) {
try {
@@ -221,7 +230,7 @@ public class CubeController extends BasicController {
}
}
- @RequestMapping(value = "/{cubeName}/purge", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/{cubeName}/purge", method = {RequestMethod.PUT})
@ResponseBody
public CubeInstance purgeCube(@PathVariable String cubeName) {
try {
@@ -239,7 +248,7 @@ public class CubeController extends BasicController {
}
}
- @RequestMapping(value = "/{cubeName}/enable", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/{cubeName}/enable", method = {RequestMethod.PUT})
@ResponseBody
public CubeInstance enableCube(@PathVariable String cubeName) {
try {
@@ -256,7 +265,7 @@ public class CubeController extends BasicController {
}
}
- @RequestMapping(value = "/{cubeName}", method = { RequestMethod.DELETE })
+ @RequestMapping(value = "/{cubeName}", method = {RequestMethod.DELETE})
@ResponseBody
public void deleteCube(@PathVariable String cubeName) {
CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -264,44 +273,126 @@ public class CubeController extends BasicController {
throw new NotFoundException("Cube with name " + cubeName + " not found..");
}
+ //drop related StreamingConfig KafkaConfig if exist
+ try {
+ List<StreamingConfig> configs= streamingService.listAllStreamingConfigs(cubeName);
+ for(StreamingConfig config:configs){
+ try {
+ streamingService.dropStreamingConfig(config);
+ } catch (IOException e) {
+ logger.error(e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to delete StreamingConfig. " + " Caused by: " + e.getMessage(), e);
+ }
+ try {
+ KafkaConfig kfkConfig = kafkaConfigService.getKafkaConfig(config.getName());
+ kafkaConfigService.dropKafkaConfig(kfkConfig);
+ } catch (IOException e) {
+ throw new InternalErrorException("Failed to delete KafkaConfig. " + " Caused by: " + e.getMessage(), e);
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ //drop Cube
try {
cubeService.deleteCube(cube);
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException("Failed to delete cube. " + " Caused by: " + e.getMessage(), e);
}
+
}
/**
- *save cubeDesc
+ * save cubeDesc
*
* @return Table metadata array
* @throws IOException
*/
- @RequestMapping(value = "", method = { RequestMethod.POST })
+ @RequestMapping(value = "", method = {RequestMethod.POST})
@ResponseBody
public CubeRequest saveCubeDesc(@RequestBody CubeRequest cubeRequest) {
CubeDesc desc = deserializeCubeDesc(cubeRequest);
if (desc == null) {
+ cubeRequest.setMessage("CubeDesc is null.");
return cubeRequest;
}
-
String name = CubeService.getCubeNameFromDesc(desc.getName());
if (StringUtils.isEmpty(name)) {
logger.info("Cube name should not be empty.");
throw new BadRequestException("Cube name should not be empty.");
}
+ CubeInstance cubeInstance;
+
try {
desc.setUuid(UUID.randomUUID().toString());
String projectName = (null == cubeRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME : cubeRequest.getProject();
- cubeService.createCubeAndDesc(name, projectName, desc);
+ cubeInstance = cubeService.createCubeAndDesc(name, projectName, desc);
} catch (Exception e) {
logger.error("Failed to deal with the request.", e);
throw new InternalErrorException(e.getLocalizedMessage(), e);
}
+ //streaming Cube
+ if (cubeRequest.getStreamingCube().equals("true")) {
+ StreamingConfig streamingConfig = deserializeStreamingDesc(cubeRequest);
+ KafkaConfig kafkaConfig = deserializeKafkaDesc(cubeRequest);
+ if (streamingConfig == null) {
+ cubeRequest.setMessage("No StreamingConfig info defined.");
+ return cubeRequest;
+ }
+ if (kafkaConfig == null) {
+ cubeRequest.setMessage("No KafkaConfig info defined.");
+ return cubeRequest;
+ }
+ if (StringUtils.isEmpty(streamingConfig.getName())) {
+ logger.info("StreamingConfig Name should not be empty.");
+ throw new BadRequestException("StremingConfig name should not be empty.");
+ }
+
+ if (StringUtils.isEmpty(kafkaConfig.getName())) {
+ logger.info("KafkaConfig Name should not be empty.");
+ throw new BadRequestException("KafkaConfig name should not be empty.");
+ }
+
+ try {
+ streamingConfig.setUuid(UUID.randomUUID().toString());
+ streamingService.createStreamingConfig(streamingConfig);
+
+ } catch (IOException e) {
+ try {
+ cubeService.deleteCube(cubeInstance);
+ } catch (Exception ex) {
+ throw new InternalErrorException("Failed to rollback on delete cube. " + " Caused by: " + e.getMessage(), ex);
+ }
+ logger.error("Failed to save StreamingConfig:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to save StreamingConfig: " + e.getLocalizedMessage());
+ }
+ try {
+ kafkaConfig.setUuid(UUID.randomUUID().toString());
+ kafkaConfigService.createKafkaConfig(kafkaConfig);
+ } catch (IOException e) {
+ try {
+ streamingService.dropStreamingConfig(streamingConfig);
+ } catch (IOException e1) {
+ throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
+ }
+
+ try {
+ cubeService.deleteCube(cubeInstance);
+ } catch (Exception ex) {
+ throw new InternalErrorException("Failed to rollback on delete cube. " + " Caused by: " + e.getMessage(), ex);
+ }
+ logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage());
+ }
+
+ }
+
+
cubeRequest.setUuid(desc.getUuid());
cubeRequest.setSuccessful(true);
return cubeRequest;
@@ -314,7 +405,7 @@ public class CubeController extends BasicController {
* @throws JsonProcessingException
* @throws IOException
*/
- @RequestMapping(value = "", method = { RequestMethod.PUT })
+ @RequestMapping(value = "", method = {RequestMethod.PUT})
@ResponseBody
public CubeRequest updateCubeDesc(@RequestBody CubeRequest cubeRequest) throws JsonProcessingException {
@@ -356,6 +447,46 @@ public class CubeController extends BasicController {
logger.warn("Cube " + desc.getName() + " fail to update because " + desc.getError());
updateRequest(cubeRequest, false, omitMessage(desc.getError()));
}
+
+ //streaming Cube
+ if (cubeRequest.getStreamingCube().equals("true")) {
+ StreamingConfig streamingConfig = deserializeStreamingDesc(cubeRequest);
+ KafkaConfig kafkaConfig = deserializeKafkaDesc(cubeRequest);
+ if (streamingConfig == null) {
+ cubeRequest.setMessage("No StreamingConfig info to update.");
+ return cubeRequest;
+ }
+ if (kafkaConfig == null) {
+ cubeRequest.setMessage("No KafkaConfig info to update.");
+ return cubeRequest;
+ }
+ if (StringUtils.isEmpty(streamingConfig.getName())) {
+ logger.info("StreamingConfig Name should not be empty.");
+ throw new BadRequestException("StremingConfig name should not be empty.");
+ }
+
+ if (StringUtils.isEmpty(kafkaConfig.getName())) {
+ logger.info("KafkaConfig Name should not be empty.");
+ throw new BadRequestException("KafkaConfig name should not be empty.");
+ }
+
+ try {
+ streamingService.updateStreamingConfig(streamingConfig);
+
+ } catch (IOException e) {
+ logger.error("Failed to update StreamingConfig:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to update StreamingConfig: " + e.getLocalizedMessage());
+ }
+ try {
+ kafkaConfigService.updateKafkaConfig(kafkaConfig);
+ } catch (IOException e) {
+ logger.error("Failed to update KafkaConfig:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to update KafkaConfig: " + e.getLocalizedMessage());
+ }
+
+ }
+
+
String descData = JsonUtil.writeValueAsIndentString(desc);
cubeRequest.setCubeDescData(descData);
cubeRequest.setSuccessful(true);
@@ -363,12 +494,12 @@ public class CubeController extends BasicController {
}
/**
- *get Hbase Info
+ * get Hbase Info
*
* @return true
* @throws IOException
*/
- @RequestMapping(value = "/{cubeName}/hbase", method = { RequestMethod.GET })
+ @RequestMapping(value = "/{cubeName}/hbase", method = {RequestMethod.GET})
@ResponseBody
public List<HBaseResponse> getHBaseInfo(@PathVariable String cubeName) {
List<HBaseResponse> hbase = new ArrayList<HBaseResponse>();
@@ -405,7 +536,7 @@ public class CubeController extends BasicController {
return hbase;
}
- @RequestMapping(value = "/{cubeName}/segments", method = { RequestMethod.POST })
+ @RequestMapping(value = "/{cubeName}/segments", method = {RequestMethod.POST})
@ResponseBody
public CubeSegmentRequest appendSegment(@PathVariable String cubeName, @RequestBody CubeSegmentRequest cubeSegmentRequest) {
CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -470,12 +601,49 @@ public class CubeController extends BasicController {
return desc;
}
+ private StreamingConfig deserializeStreamingDesc(CubeRequest cubeRequest) {
+ StreamingConfig desc = null;
+ try {
+ logger.debug("Saving StreamingConfig " + cubeRequest.getStreamingData());
+ desc = JsonUtil.readValue(cubeRequest.getStreamingData(), StreamingConfig.class);
+ } catch (JsonParseException e) {
+ logger.error("The StreamingConfig definition is not valid.", e);
+ updateRequest(cubeRequest, false, e.getMessage());
+ } catch (JsonMappingException e) {
+ logger.error("The data StreamingConfig definition is not valid.", e);
+ updateRequest(cubeRequest, false, e.getMessage());
+ } catch (IOException e) {
+ logger.error("Failed to deal with the request.", e);
+ throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+ }
+ return desc;
+ }
+
+
+ private KafkaConfig deserializeKafkaDesc(CubeRequest cubeRequest) {
+ KafkaConfig desc = null;
+ try {
+ logger.debug("Saving KafkaConfig " + cubeRequest.getKafkaData());
+ desc = JsonUtil.readValue(cubeRequest.getKafkaData(), KafkaConfig.class);
+ } catch (JsonParseException e) {
+ logger.error("The KafkaConfig definition is not valid.", e);
+ updateRequest(cubeRequest, false, e.getMessage());
+ } catch (JsonMappingException e) {
+ logger.error("The data KafkaConfig definition is not valid.", e);
+ updateRequest(cubeRequest, false, e.getMessage());
+ } catch (IOException e) {
+ logger.error("Failed to deal with the request.", e);
+ throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+ }
+ return desc;
+ }
+
/**
* @return
*/
private String omitMessage(List<String> errors) {
StringBuffer buffer = new StringBuffer();
- for (Iterator<String> iterator = errors.iterator(); iterator.hasNext();) {
+ for (Iterator<String> iterator = errors.iterator(); iterator.hasNext(); ) {
String string = (String) iterator.next();
buffer.append(string);
buffer.append("\n");
@@ -497,4 +665,11 @@ public class CubeController extends BasicController {
this.jobService = jobService;
}
+ public void setStreamingService(StreamingService streamingService) {
+ this.streamingService = streamingService;
+ }
+
+ public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
+ this.kafkaConfigService = kafkaConfigService;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java b/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
index b8af9d6..a409938 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
@@ -117,7 +117,7 @@ public class ModelController extends BasicController {
try {
modelDesc = modelService.updateModelAndDesc(modelDesc);
} catch (AccessDeniedException accessDeniedException) {
- throw new ForbiddenException("You don't have right to update this cube.");
+ throw new ForbiddenException("You don't have right to update this model.");
} catch (Exception e) {
logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
new file mode 100644
index 0000000..78b63c0
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.controller;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.rest.exception.BadRequestException;
+import org.apache.kylin.rest.exception.ForbiddenException;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.exception.NotFoundException;
+import org.apache.kylin.rest.request.StreamingRequest;
+import org.apache.kylin.rest.service.KafkaConfigService;
+import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.AccessDeniedException;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.*;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * StreamingController is defined as Restful API entrance for UI.
+ *
+ * @author jiazhong
+ */
+@Controller
+@RequestMapping(value = "/streaming")
+public class StreamingController extends BasicController {
+ private static final Logger logger = LoggerFactory.getLogger(StreamingController.class);
+
+ @Autowired
+ private StreamingService streamingService;
+ @Autowired
+ private KafkaConfigService kafkaConfigService;
+
+ @RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
+ @ResponseBody
+ public List<StreamingConfig> getStreamings(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+ try {
+ return streamingService.getStreamingConfigs(cubeName, limit, offset);
+ } catch (IOException e) {
+ logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+ }
+ }
+
+ @RequestMapping(value = "/getKfkConfig", method = { RequestMethod.GET })
+ @ResponseBody
+ public List<KafkaConfig> getKafkaConfigs(@RequestParam(value = "kafkaConfigName", required = false) String kafkaConfigName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+ try {
+ return kafkaConfigService.getKafkaConfigs(kafkaConfigName, limit, offset);
+ } catch (IOException e) {
+ logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+ }
+ }
+
+
+ /**
+ *
+ * create Streaming Schema
+ * @throws java.io.IOException
+ */
+ @RequestMapping(value = "", method = { RequestMethod.POST })
+ @ResponseBody
+ public StreamingRequest saveStreamingConfig(@RequestBody StreamingRequest streamingRequest) {
+ //Update Model
+ StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
+ KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
+ if (streamingConfig == null ||kafkaConfig == null) {
+ return streamingRequest;
+ }
+ if (StringUtils.isEmpty(streamingConfig.getName())) {
+ logger.info("StreamingConfig should not be empty.");
+ throw new BadRequestException("StremingConfig name should not be empty.");
+ }
+
+ try {
+ streamingConfig.setUuid(UUID.randomUUID().toString());
+ streamingService.createStreamingConfig(streamingConfig);
+
+ } catch (IOException e) {
+ logger.error("Failed to save StreamingConfig:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to save StreamingConfig: " + e.getLocalizedMessage());
+ }
+ try {
+ kafkaConfig.setUuid(UUID.randomUUID().toString());
+ kafkaConfigService.createKafkaConfig(kafkaConfig);
+ }catch (IOException e){
+ try {
+ streamingService.dropStreamingConfig(streamingConfig);
+ } catch (IOException e1) {
+ throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
+ }
+ logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage());
+ }
+ streamingRequest.setSuccessful(true);
+ return streamingRequest;
+ }
+
+ @RequestMapping(value = "", method = { RequestMethod.PUT })
+ @ResponseBody
+ public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
+ StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
+ KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
+
+ if (streamingConfig == null) {
+ return streamingRequest;
+ }
+ try {
+ streamingConfig = streamingService.updateStreamingConfig(streamingConfig);
+ } catch (AccessDeniedException accessDeniedException) {
+ throw new ForbiddenException("You don't have right to update this StreamingConfig.");
+ } catch (Exception e) {
+ logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+ }
+ try {
+ kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig);
+ }catch (AccessDeniedException accessDeniedException) {
+ throw new ForbiddenException("You don't have right to update this KafkaConfig.");
+ } catch (Exception e) {
+ logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+ }
+
+ streamingRequest.setSuccessful(true);
+
+ return streamingRequest;
+ }
+
+ @RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE })
+ @ResponseBody
+ public void deleteConfig(@PathVariable String configName) throws IOException {
+ StreamingConfig config = streamingService.getSreamingManager().getStreamingConfig(configName);
+ KafkaConfig kafkaConfig = kafkaConfigService.getKafkaConfig(configName);
+ if (null == config) {
+ throw new NotFoundException("StreamingConfig with name " + configName + " not found..");
+ }
+ try {
+ streamingService.dropStreamingConfig(config);
+ kafkaConfigService.dropKafkaConfig(kafkaConfig);
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to delete StreamingConfig. " + " Caused by: " + e.getMessage(), e);
+ }
+ }
+
+ private StreamingConfig deserializeSchemalDesc(StreamingRequest streamingRequest) {
+ StreamingConfig desc = null;
+ try {
+ logger.debug("Saving StreamingConfig " + streamingRequest.getStreamingConfig());
+ desc = JsonUtil.readValue(streamingRequest.getStreamingConfig(), StreamingConfig.class);
+ } catch (JsonParseException e) {
+ logger.error("The StreamingConfig definition is not valid.", e);
+ updateRequest(streamingRequest, false, e.getMessage());
+ } catch (JsonMappingException e) {
+ logger.error("The data StreamingConfig definition is not valid.", e);
+ updateRequest(streamingRequest, false, e.getMessage());
+ } catch (IOException e) {
+ logger.error("Failed to deal with the request.", e);
+ throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+ }
+ return desc;
+ }
+
+
+ private KafkaConfig deserializeKafkaSchemalDesc(StreamingRequest streamingRequest) {
+ KafkaConfig desc = null;
+ try {
+ logger.debug("Saving KafkaConfig " + streamingRequest.getKafkaConfig());
+ desc = JsonUtil.readValue(streamingRequest.getKafkaConfig(), KafkaConfig.class);
+ } catch (JsonParseException e) {
+ logger.error("The KafkaConfig definition is not valid.", e);
+ updateRequest(streamingRequest, false, e.getMessage());
+ } catch (JsonMappingException e) {
+ logger.error("The data KafkaConfig definition is not valid.", e);
+ updateRequest(streamingRequest, false, e.getMessage());
+ } catch (IOException e) {
+ logger.error("Failed to deal with the request.", e);
+ throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+ }
+ return desc;
+ }
+
+ private void updateRequest(StreamingRequest request, boolean success, String message) {
+ request.setSuccessful(success);
+ request.setMessage(message);
+ }
+
+ public void setStreamingService(StreamingService streamingService) {
+ this.streamingService= streamingService;
+ }
+
+ public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
+ this.kafkaConfigService = kafkaConfigService;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java b/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
index 1371a4f..fb9952b 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
@@ -23,9 +23,12 @@ public class CubeRequest {
private String uuid;
private String cubeName;
private String cubeDescData;
+ private String streamingData;
+ private String kafkaData;
private boolean successful;
private String message;
private String project;
+ private String streamingCube;
public String getUuid() {
return uuid;
@@ -104,4 +107,27 @@ public class CubeRequest {
this.project = project;
}
+ public String getStreamingCube() {
+ return streamingCube;
+ }
+
+ public void setStreamingCube(String streamingCube) {
+ this.streamingCube = streamingCube;
+ }
+
+ public String getStreamingData() {
+ return streamingData;
+ }
+
+ public void setStreamingData(String streamingData) {
+ this.streamingData = streamingData;
+ }
+
+ public String getKafkaData() {
+ return kafkaData;
+ }
+
+ public void setKafkaData(String kafkaData) {
+ this.kafkaData = kafkaData;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
index 3bcf9d7..07c30f3 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
@@ -25,6 +25,14 @@ import java.lang.String;public class StreamingRequest {
private String tableData;
+ private String streamingConfig;
+
+ private String kafkaConfig;
+
+ private boolean successful;
+
+ private String message;
+
public String getProject() {
return project;
}
@@ -40,4 +48,37 @@ import java.lang.String;public class StreamingRequest {
public void setTableData(String tableData) {
this.tableData = tableData;
}
+
+ public boolean isSuccessful() {
+ return successful;
+ }
+
+ public void setSuccessful(boolean successful) {
+ this.successful = successful;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public String getStreamingConfig() {
+ return streamingConfig;
+ }
+
+ public void setStreamingConfig(String streamingConfig) {
+ this.streamingConfig = streamingConfig;
+ }
+
+ public String getKafkaConfig() {
+ return kafkaConfig;
+ }
+
+ public void setKafkaConfig(String kafkaConfig) {
+ this.kafkaConfig = kafkaConfig;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 70deada..5ac12ea 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -43,6 +43,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.invertedindex.IIDescManager;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.job.execution.AbstractExecutable;
@@ -56,6 +57,7 @@ import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.query.enumerator.OLAPQuery;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.storage.hybrid.HybridManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -167,6 +169,14 @@ public abstract class BasicService {
return CubeManager.getInstance(getConfig());
}
+ public final StreamingManager getSreamingManager() {
+ return StreamingManager.getInstance(getConfig());
+ }
+
+ public final KafkaConfigManager getKafkaManager() throws IOException {
+ return KafkaConfigManager.getInstance(getConfig());
+ }
+
public final CubeDescManager getCubeDescManager() {
return CubeDescManager.getInstance(getConfig());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 45cff4d..98ddc6f 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.invertedindex.IIDescManager;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.metadata.MetadataManager;
@@ -35,6 +36,7 @@ import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.realization.RealizationRegistry;
import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.storage.hybrid.HybridManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,54 +77,62 @@ public class CacheService extends BasicService {
logger.info(log);
try {
switch (cacheType) {
- case CUBE:
- CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
- getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
- getProjectManager().clearL2Cache();
- //clean query related cache first
- super.cleanDataCache(newCube.getUuid());
- cubeService.updateOnNewSegmentReady(cacheKey);
- break;
- case CUBE_DESC:
- getCubeDescManager().reloadCubeDescLocal(cacheKey);
- break;
- case PROJECT:
- ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
- removeOLAPDataSource(projectInstance.getName());
- break;
- case INVERTED_INDEX:
- //II update does not need to update storage cache because it is dynamic already
- getIIManager().reloadIILocal(cacheKey);
- getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey);
- getProjectManager().clearL2Cache();
- break;
- case INVERTED_INDEX_DESC:
- getIIDescManager().reloadIIDescLocal(cacheKey);
- break;
- case TABLE:
- getMetadataManager().reloadTableCache(cacheKey);
- IIDescManager.clearCache();
- CubeDescManager.clearCache();
- break;
- case DATA_MODEL:
- getMetadataManager().reloadDataModelDesc(cacheKey);
- IIDescManager.clearCache();
- CubeDescManager.clearCache();
- break;
- case ALL:
- MetadataManager.clearCache();
- CubeDescManager.clearCache();
- CubeManager.clearCache();
- IIDescManager.clearCache();
- IIManager.clearCache();
- HybridManager.clearCache();
- RealizationRegistry.clearCache();
- ProjectManager.clearCache();
- super.cleanAllDataCache();
- BasicService.removeAllOLAPDataSources();
- break;
- default:
- throw new RuntimeException("invalid cacheType:" + cacheType);
+ case CUBE:
+ CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
+ getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
+ getProjectManager().clearL2Cache();
+ //clean query related cache first
+ super.cleanDataCache(newCube.getUuid());
+ cubeService.updateOnNewSegmentReady(cacheKey);
+ break;
+ case STREAMING:
+ getSreamingManager().reloadStreamingConfigLocal(cacheKey);
+ break;
+ case KAFKA:
+ getKafkaManager().reloadKafkaConfigLocal(cacheKey);
+ break;
+ case CUBE_DESC:
+ getCubeDescManager().reloadCubeDescLocal(cacheKey);
+ break;
+ case PROJECT:
+ ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
+ removeOLAPDataSource(projectInstance.getName());
+ break;
+ case INVERTED_INDEX:
+ //II update does not need to update storage cache because it is dynamic already
+ getIIManager().reloadIILocal(cacheKey);
+ getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey);
+ getProjectManager().clearL2Cache();
+ break;
+ case INVERTED_INDEX_DESC:
+ getIIDescManager().reloadIIDescLocal(cacheKey);
+ break;
+ case TABLE:
+ getMetadataManager().reloadTableCache(cacheKey);
+ IIDescManager.clearCache();
+ CubeDescManager.clearCache();
+ break;
+ case DATA_MODEL:
+ getMetadataManager().reloadDataModelDesc(cacheKey);
+ IIDescManager.clearCache();
+ CubeDescManager.clearCache();
+ break;
+ case ALL:
+ MetadataManager.clearCache();
+ CubeDescManager.clearCache();
+ CubeManager.clearCache();
+ IIDescManager.clearCache();
+ IIManager.clearCache();
+ HybridManager.clearCache();
+ RealizationRegistry.clearCache();
+ ProjectManager.clearCache();
+ KafkaConfigManager.clearCache();
+ StreamingManager.clearCache();
+ super.cleanAllDataCache();
+ BasicService.removeAllOLAPDataSources();
+ break;
+ default:
+ throw new RuntimeException("invalid cacheType:" + cacheType);
}
} catch (IOException e) {
throw new RuntimeException("error " + log, e);
@@ -133,29 +143,29 @@ public class CacheService extends BasicService {
final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
try {
switch (cacheType) {
- case CUBE:
- String storageUUID = getCubeManager().getCube(cacheKey).getUuid();
- getCubeManager().removeCubeLocal(cacheKey);
- super.cleanDataCache(storageUUID);
- break;
- case CUBE_DESC:
- getCubeDescManager().removeLocalCubeDesc(cacheKey);
- break;
- case PROJECT:
- ProjectManager.clearCache();
- break;
- case INVERTED_INDEX:
- getIIManager().removeIILocal(cacheKey);
- break;
- case INVERTED_INDEX_DESC:
- getIIDescManager().removeIIDescLocal(cacheKey);
- break;
- case TABLE:
- throw new UnsupportedOperationException(log);
- case DATA_MODEL:
- throw new UnsupportedOperationException(log);
- default:
- throw new RuntimeException("invalid cacheType:" + cacheType);
+ case CUBE:
+ String storageUUID = getCubeManager().getCube(cacheKey).getUuid();
+ getCubeManager().removeCubeLocal(cacheKey);
+ super.cleanDataCache(storageUUID);
+ break;
+ case CUBE_DESC:
+ getCubeDescManager().removeLocalCubeDesc(cacheKey);
+ break;
+ case PROJECT:
+ ProjectManager.clearCache();
+ break;
+ case INVERTED_INDEX:
+ getIIManager().removeIILocal(cacheKey);
+ break;
+ case INVERTED_INDEX_DESC:
+ getIIDescManager().removeIIDescLocal(cacheKey);
+ break;
+ case TABLE:
+ throw new UnsupportedOperationException(log);
+ case DATA_MODEL:
+ throw new UnsupportedOperationException(log);
+ default:
+ throw new RuntimeException("invalid cacheType:" + cacheType);
}
} catch (IOException e) {
throw new RuntimeException("error " + log, e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java b/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
new file mode 100644
index 0000000..a0b19b2
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PostFilter;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component("kafkaMgmtService")
+public class KafkaConfigService extends BasicService {
+
+ @Autowired
+ private AccessService accessService;
+
+ @PostFilter(Constant.ACCESS_POST_FILTER_READ)
+ public List<KafkaConfig> listAllKafkaConfigs(final String kafkaConfigName) throws IOException {
+ List<KafkaConfig> kafkaConfigs = new ArrayList<KafkaConfig>();
+// CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null;
+ if (null == kafkaConfigName) {
+ kafkaConfigs = getKafkaManager().listAllKafkaConfigs();
+ } else {
+ List<KafkaConfig> configs = getKafkaManager().listAllKafkaConfigs();
+ for(KafkaConfig config : configs){
+ if(kafkaConfigName.equals(config.getName())){
+ kafkaConfigs.add(config);
+ }
+ }
+ }
+
+ return kafkaConfigs;
+ }
+
+ public List<KafkaConfig> getKafkaConfigs(final String kafkaConfigName, final Integer limit, final Integer offset) throws IOException {
+
+ List<KafkaConfig> kafkaConfigs;
+ kafkaConfigs = listAllKafkaConfigs(kafkaConfigName);
+
+ if (limit == null || offset == null) {
+ return kafkaConfigs;
+ }
+
+ if ((kafkaConfigs.size() - offset) < limit) {
+ return kafkaConfigs.subList(offset, kafkaConfigs.size());
+ }
+
+ return kafkaConfigs.subList(offset, offset + limit);
+ }
+
+
+
+ public KafkaConfig createKafkaConfig(KafkaConfig config) throws IOException {
+ if (getKafkaManager().getKafkaConfig(config.getName()) != null) {
+ throw new InternalErrorException("The kafkaConfig named " + config.getName() + " already exists");
+ }
+ getKafkaManager().createKafkaConfig(config.getName(),config);
+ return config;
+ }
+
+// @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+ public KafkaConfig updateKafkaConfig(KafkaConfig config) throws IOException {
+ return getKafkaManager().updateKafkaConfig(config);
+ }
+
+ public KafkaConfig getKafkaConfig(String configName) throws IOException {
+ return getKafkaManager().getKafkaConfig(configName);
+ }
+// @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+ public void dropKafkaConfig(KafkaConfig config) throws IOException {
+ getKafkaManager().removeKafkaConfig(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
new file mode 100644
index 0000000..0f67ac8
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PostFilter;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component("streamingMgmtService")
+public class StreamingService extends BasicService {
+
+ @Autowired
+ private AccessService accessService;
+
+ @PostFilter(Constant.ACCESS_POST_FILTER_READ)
+ public List<StreamingConfig> listAllStreamingConfigs(final String cubeName) throws IOException {
+ List<StreamingConfig> streamingConfigs = new ArrayList();
+ CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null;
+ if (null == cubeInstance) {
+ streamingConfigs = getSreamingManager().listAllStreaming();
+ } else {
+ for(StreamingConfig config : getSreamingManager().listAllStreaming()){
+ if(cubeInstance.getName().equals(config.getCubeName())){
+ streamingConfigs.add(config);
+ }
+ }
+ }
+
+ return streamingConfigs;
+ }
+
+ public List<StreamingConfig> getStreamingConfigs(final String cubeName, final Integer limit, final Integer offset) throws IOException {
+
+ List<StreamingConfig> streamingConfigs;
+ streamingConfigs = listAllStreamingConfigs(cubeName);
+
+ if (limit == null || offset == null) {
+ return streamingConfigs;
+ }
+
+ if ((streamingConfigs.size() - offset) < limit) {
+ return streamingConfigs.subList(offset, streamingConfigs.size());
+ }
+
+ return streamingConfigs.subList(offset, offset + limit);
+ }
+
+ public StreamingConfig createStreamingConfig(StreamingConfig config) throws IOException {
+ if (getSreamingManager().getStreamingConfig(config.getName()) != null) {
+ throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists");
+ }
+ StreamingConfig streamingConfig = getSreamingManager().saveStreamingConfig(config);
+ return streamingConfig;
+ }
+
+// @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+ public StreamingConfig updateStreamingConfig(StreamingConfig config) throws IOException {
+ return getSreamingManager().updateStreamingConfig(config);
+ }
+
+// @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+ public void dropStreamingConfig(StreamingConfig config) throws IOException {
+ getSreamingManager().removeStreamingConfig(config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/resources/kylinSecurity.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index 518a9bd..ee3c891 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -19,6 +19,8 @@
<scr:intercept-url pattern="/api/cache*/**" access="permitAll" />
<scr:intercept-url pattern="/api/cubes/src/tables" access="hasAnyRole('ROLE_ANALYST')" />
<scr:intercept-url pattern="/api/cubes*/**" access="isAuthenticated()" />
+ <scr:intercept-url pattern="/api/models*/**" access="isAuthenticated()" />
+ <scr:intercept-url pattern="/api/streaming*/**" access="isAuthenticated()" />
<scr:intercept-url pattern="/api/job*/**" access="isAuthenticated()" />
<scr:intercept-url pattern="/api/admin/config" access="permitAll" />
<scr:intercept-url pattern="/api/projects" access="permitAll" />
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index d8f9f50..3032d13 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -40,13 +40,21 @@ import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.SimpleType;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -60,15 +68,25 @@ public class KafkaConfigManager {
private KylinConfig config;
- private KafkaConfigManager(KylinConfig config) {
+ // name ==> StreamingConfig
+ private CaseInsensitiveStringCache<KafkaConfig> kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(Broadcaster.TYPE.KAFKA);
+
+ public static final Serializer<KafkaConfig> KAFKA_SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class);
+
+ public static void clearCache() {
+ CACHE.clear();
+ }
+
+ private KafkaConfigManager(KylinConfig config) throws IOException {
this.config = config;
+ reloadAllKafkaConfig();
}
private ResourceStore getStore() {
return ResourceStore.getStore(this.config);
}
- public static KafkaConfigManager getInstance(KylinConfig config) {
+ public static KafkaConfigManager getInstance(KylinConfig config){
KafkaConfigManager r = CACHE.get(config);
if (r != null) {
return r;
@@ -79,15 +97,43 @@ public class KafkaConfigManager {
if (r != null) {
return r;
}
+ try{
r = new KafkaConfigManager(config);
CACHE.put(config, r);
if (CACHE.size() > 1) {
- logger.warn("More than one cubemanager singleton exist");
+ logger.warn("More than one KafkaConfigManager singleton exist");
}
return r;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
+ }
}
}
+ public List<KafkaConfig> listAllKafkaConfigs() {
+ return new ArrayList(kafkaMap.values());
+ }
+
+ /**
+ * Reload KafkaConfig from resource store It will be triggered by an desc
+ * update event.
+ *
+ * @param name
+ * @throws IOException
+ */
+ public KafkaConfig reloadKafkaConfigLocal(String name) throws IOException {
+
+ // Save Source
+ String path = KafkaConfig.getKafkaResourcePath(name);
+
+ // Reload the KafkaConfig
+ KafkaConfig ndesc = loadKafkaConfigAt(path);
+
+ // Here replace the old one
+ kafkaMap.putLocal(ndesc.getName(), ndesc);
+ return ndesc;
+ }
+
private boolean checkExistence(String name) {
return true;
}
@@ -96,9 +142,17 @@ public class KafkaConfigManager {
return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + name + ".json";
}
- public boolean createOrUpdateKafkaConfig(String name, KafkaConfig config) {
+ public boolean createKafkaConfig(String name, KafkaConfig config) {
+
+ if (config == null || StringUtils.isEmpty(config.getName())) {
+ throw new IllegalArgumentException();
+ }
+
+ if (kafkaMap.containsKey(config.getName()))
+ throw new IllegalArgumentException("KafkaConfig '" + config.getName() + "' already exists");
try {
getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER);
+ kafkaMap.put(config.getName(), config);
return true;
} catch (IOException e) {
logger.error("error save resource name:" + name, e);
@@ -106,13 +160,41 @@ public class KafkaConfigManager {
}
}
- public KafkaConfig getKafkaConfig(String name) {
- try {
- return getStore().getResource(formatStreamingConfigPath(name), KafkaConfig.class, KafkaConfig.SERIALIZER);
- } catch (IOException e) {
- logger.error("error get resource name:" + name, e);
- throw new RuntimeException("error get resource name:" + name, e);
+ public KafkaConfig updateKafkaConfig(KafkaConfig desc) throws IOException {
+ // Validate KafkaConfig
+ if (desc.getUuid() == null || desc.getName() == null) {
+ throw new IllegalArgumentException();
+ }
+ String name = desc.getName();
+ if (!kafkaMap.containsKey(name)) {
+ throw new IllegalArgumentException("KafkaConfig '" + name + "' does not exist.");
+ }
+
+ // Save Source
+ String path = desc.getResourcePath();
+ getStore().putResource(path, desc, KAFKA_SERIALIZER);
+
+ // Reload the KafkaConfig
+ KafkaConfig ndesc = loadKafkaConfigAt(path);
+ // Here replace the old one
+ kafkaMap.put(ndesc.getName(), desc);
+
+ return ndesc;
+ }
+
+ private KafkaConfig loadKafkaConfigAt(String path) throws IOException {
+ ResourceStore store = getStore();
+ KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class,KAFKA_SERIALIZER );
+
+ if (StringUtils.isBlank(kafkaConfig.getName())) {
+ throw new IllegalStateException("KafkaConfig name must not be blank");
}
+ return kafkaConfig;
+ }
+
+
+ public KafkaConfig getKafkaConfig(String name) {
+ return kafkaMap.get(name);
}
public void saveKafkaConfig(KafkaConfig kafkaConfig) throws IOException {
@@ -124,6 +206,44 @@ public class KafkaConfigManager {
getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
}
+ // remove kafkaConfig
+ public void removeKafkaConfig(KafkaConfig kafkaConfig) throws IOException {
+ String path = kafkaConfig.getResourcePath();
+ getStore().deleteResource(path);
+ kafkaMap.remove(kafkaConfig.getName());
+ }
+
+
+ private void reloadAllKafkaConfig() throws IOException {
+ ResourceStore store = getStore();
+ logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAfKA_RESOURCE_ROOT));
+
+ kafkaMap.clear();
+
+ List<String> paths = store.collectResourceRecursively(ResourceStore.KAfKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+ for (String path : paths) {
+ KafkaConfig kafkaConfig;
+ try {
+ kafkaConfig = loadKafkaConfigAt(path);
+ } catch (Exception e) {
+ logger.error("Error loading kafkaConfig desc " + path, e);
+ continue;
+ }
+ if (path.equals(kafkaConfig.getResourcePath()) == false) {
+ logger.error("Skip suspicious desc at " + path + ", " + kafkaConfig + " should be at " + kafkaConfig.getResourcePath());
+ continue;
+ }
+ if (kafkaMap.containsKey(kafkaConfig.getName())) {
+ logger.error("Dup KafkaConfig name '" + kafkaConfig.getName() + "' on path " + path);
+ continue;
+ }
+
+ kafkaMap.putLocal(kafkaConfig.getName(), kafkaConfig);
+ }
+
+ logger.debug("Loaded " + kafkaMap.size() + " KafkaConfig(s)");
+ }
+
private final ObjectMapper mapper = new ObjectMapper();
private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 1aff0ce..90df9f5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -42,12 +42,14 @@ import java.io.IOException;
import java.util.List;
import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.persistence.Serializer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonManagedReference;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.metadata.MetadataConstants;
/**
*/
@@ -84,6 +86,13 @@ public class KafkaConfig extends RootPersistentEntity {
//"configA=1;configB=2"
@JsonProperty("parserProperties")
private String parserProperties;
+ public String getResourcePath() {
+ return getKafkaResourcePath(name);
+ }
+
+ public static String getKafkaResourcePath(String streamingName) {
+ return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
+ }
public List<KafkaClusterConfig> getKafkaClusterConfigs() {
return kafkaClusterConfigs;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/index.html
----------------------------------------------------------------------
diff --git a/webapp/app/index.html b/webapp/app/index.html
index d2b7e68..caa8d2a 100644
--- a/webapp/app/index.html
+++ b/webapp/app/index.html
@@ -45,6 +45,7 @@
<link rel="stylesheet" type="text/css" href="css/AdminLTE.css">
<link rel="stylesheet" type="text/css" href="components/bootstrap-sweetalert/lib/sweet-alert.css">
<link rel="stylesheet" type="text/css" href="components/angular-bootstrap-nav-tree/dist/abn_tree.css">
+ <link rel="stylesheet" type="text/css" href="components/angular-toggle-switch/angular-toggle-switch.css">
<link rel="stylesheet/less" href="less/build.less">
<!-- endref -->
@@ -102,6 +103,7 @@
<script src="components/angular-underscore/angular-underscore.js"></script>
<script src="components/jquery-ui/jquery-ui.min.js"></script>
<script src="components/angular-ui-sortable/sortable.js"></script>
+<script src="components/angular-toggle-switch/angular-toggle-switch.js"></script>
<script src="js/app.js"></script>
<script src="js/config.js"></script>
@@ -120,6 +122,7 @@
<script src="js/services/model.js"></script>
<script src="js/services/cubes.js"></script>
+<script src="js/services/streaming.js"></script>
<script src="js/services/graph.js"></script>
<script src="js/services/jobs.js"></script>
<script src="js/services/message.js"></script>
@@ -136,6 +139,8 @@
<script src="js/model/jobConfig.js"></script>
<script src="js/model/projectConfig.js"></script>
<script src="js/model/tableConfig.js"></script>
+<script src="js/model/streamingModel.js"></script>
+<script src="js/model/streamingListModel.js"></script>
<!--New GUI-->
<script src="js/model/modelConfig.js"></script>
@@ -157,6 +162,7 @@
<script src="js/controllers/job.js"></script>
<script src="js/controllers/cube.js"></script>
<script src="js/controllers/cubes.js"></script>
+<script src="js/controllers/streaming.js"></script>
<script src="js/controllers/projects.js"></script>
<script src="js/controllers/cubeEdit.js"></script>
<script src="js/controllers/cubeSchema.js"></script>
@@ -166,15 +172,17 @@
<script src="js/controllers/projectMeta.js"></script>
<script src="js/controllers/cubeModel.js"></script>
<script src="js/controllers/cubeDimensions.js"></script>
-<script src="js/controllers/cubeFilter.js"></script>
<script src="js/controllers/cubeRefresh.js"></script>
<script src="js/controllers/cubeAdvanceSetting.js"></script>
+<script src="js/controllers/cubeMeasures.js"></script>
<!--New GUI-->
<script src="js/controllers/modelSchema.js"></script>
<script src="js/controllers/modelDimensions.js"></script>
<script src="js/controllers/modelRefresh.js"></script>
<script src="js/controllers/modelEdit.js"></script>
+<script src="js/controllers/streamingConfig.js"></script>
+
<!--New GUI-->
<script src="js/controllers/models.js"></script>
@@ -222,5 +230,27 @@
</div>
</script>
+<!-- static template for cube save/update result notification -->
+<script type="text/ng-template" id="streamingResultError.html">
+ <div class="callout">
+ <h4>Error Message</h4>
+ <p>{{text}}</p>
+ </div>
+ <div class="callout callout-danger">
+ <h4>Streaming Schema</h4>
+ <pre>{{streamingSchema}}</pre>
+ </div>
+ <div class="callout callout-danger">
+ <h4>Kafka Schema</h4>
+ <pre>{{kfkSchema}}</pre>
+ </div>
+</script>
+
+<script type="text/ng-template" id="streamingResultSuccess.html">
+ <div class="callout callout-info">
+ <p>{{text}}</p>
+ </div>
+
+
</body>
</html>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/app.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/app.js b/webapp/app/js/app.js
index 1e00bdc..dc9d087 100644
--- a/webapp/app/js/app.js
+++ b/webapp/app/js/app.js
@@ -17,4 +17,4 @@
*/
//Kylin Application Module
-KylinApp = angular.module('kylin', ['ngRoute', 'ngResource', 'ngGrid', 'ui.bootstrap', 'ui.ace', 'base64', 'angularLocalStorage', 'localytics.directives', 'treeControl', 'nvd3ChartDirectives','ngLoadingRequest','oitozero.ngSweetAlert','ngCookies','angular-underscore', 'ngAnimate', 'ui.sortable','angularBootstrapNavTree']);
+KylinApp = angular.module('kylin', ['ngRoute', 'ngResource', 'ngGrid', 'ui.bootstrap', 'ui.ace', 'base64', 'angularLocalStorage', 'localytics.directives', 'treeControl', 'nvd3ChartDirectives','ngLoadingRequest','oitozero.ngSweetAlert','ngCookies','angular-underscore', 'ngAnimate', 'ui.sortable','angularBootstrapNavTree','toggle-switch']);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/cubeAdvanceSetting.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeAdvanceSetting.js b/webapp/app/js/controllers/cubeAdvanceSetting.js
index fb9c019..fc02c6e 100644
--- a/webapp/app/js/controllers/cubeAdvanceSetting.js
+++ b/webapp/app/js/controllers/cubeAdvanceSetting.js
@@ -98,4 +98,32 @@ KylinApp.controller('CubeAdvanceSettingCtrl', function ($scope, $modal,cubeConfi
})
}
+
+ $scope.addNewRowkeyColumn = function () {
+ $scope.cubeMetaFrame.rowkey.rowkey_columns.push({
+ "column": "",
+ "length": 0,
+ "dictionary": "true",
+ "mandatory": false
+ });
+ };
+
+ $scope.addNewAggregationGroup = function () {
+ $scope.cubeMetaFrame.rowkey.aggregation_groups.push([]);
+ };
+
+ $scope.refreshAggregationGroup = function (list, index, aggregation_groups) {
+ if (aggregation_groups) {
+ list[index] = aggregation_groups;
+ }
+ };
+
+ $scope.removeElement = function (arr, element) {
+ var index = arr.indexOf(element);
+ if (index > -1) {
+ arr.splice(index, 1);
+ }
+ };
+
+
});