You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/09 14:17:38 UTC
[6/9] kylin git commit: KYLIN-2072 Cleanup old streaming code
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
deleted file mode 100644
index 271bf41..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.cachesync.Broadcaster;
-import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
-import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class StreamingManager {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
-
- // static cached instances
- private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
-
- public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
-
- private KylinConfig config;
-
- // name ==> StreamingConfig
- private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
-
- public static void clearCache() {
- CACHE.clear();
- }
-
- private StreamingManager(KylinConfig config) throws IOException {
- this.config = config;
- this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
-
- // touch lower level metadata before registering my listener
- reloadAllStreaming();
- Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming");
- }
-
- private class StreamingSyncListener extends Broadcaster.Listener {
- @Override
- public void onClearAll(Broadcaster broadcaster) throws IOException {
- clearCache();
- }
-
- @Override
- public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
- if (event == Event.DROP)
- removeStreamingLocal(cacheKey);
- else
- reloadStreamingConfigLocal(cacheKey);
- }
- }
-
- private ResourceStore getStore() {
- return ResourceStore.getStore(this.config);
- }
-
- public static StreamingManager getInstance(KylinConfig config) {
- StreamingManager r = CACHE.get(config);
- if (r != null) {
- return r;
- }
-
- synchronized (StreamingManager.class) {
- r = CACHE.get(config);
- if (r != null) {
- return r;
- }
- try {
- r = new StreamingManager(config);
- CACHE.put(config, r);
- if (CACHE.size() > 1) {
- logger.warn("More than one singleton exist");
- }
- return r;
- } catch (IOException e) {
- throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
- }
- }
- }
-
- private static String formatStreamingConfigPath(String name) {
- return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
- }
-
- private static String formatStreamingOutputPath(String streaming, int partition) {
- return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
- }
-
- private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
- return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
- }
-
- public StreamingConfig getStreamingConfig(String name) {
- return streamingMap.get(name);
- }
-
- public List<StreamingConfig> listAllStreaming() {
- return new ArrayList<>(streamingMap.values());
- }
-
- /**
- * Reload StreamingConfig from resource store It will be triggered by an desc
- * update event.
- *
- * @param name
- * @throws IOException
- */
- public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
-
- // Save Source
- String path = StreamingConfig.concatResourcePath(name);
-
- // Reload the StreamingConfig
- StreamingConfig ndesc = loadStreamingConfigAt(path);
-
- // Here replace the old one
- streamingMap.putLocal(ndesc.getName(), ndesc);
- return ndesc;
- }
-
- // remove streamingConfig
- public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
- String path = streamingConfig.getResourcePath();
- getStore().deleteResource(path);
- streamingMap.remove(streamingConfig.getName());
- }
-
- public StreamingConfig getConfig(String name) {
- name = name.toUpperCase();
- return streamingMap.get(name);
- }
-
- public void removeStreamingLocal(String streamingName) {
- streamingMap.removeLocal(streamingName);
- }
-
- /**
- * Update CubeDesc with the input. Broadcast the event into cluster
- *
- * @param desc
- * @return
- * @throws IOException
- */
- public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
- // Validate CubeDesc
- if (desc.getUuid() == null || desc.getName() == null) {
- throw new IllegalArgumentException("SteamingConfig Illegal.");
- }
- String name = desc.getName();
- if (!streamingMap.containsKey(name)) {
- throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
- }
-
- // Save Source
- String path = desc.getResourcePath();
- getStore().putResource(path, desc, STREAMING_SERIALIZER);
-
- // Reload the StreamingConfig
- StreamingConfig ndesc = loadStreamingConfigAt(path);
- // Here replace the old one
- streamingMap.put(ndesc.getName(), desc);
-
- return ndesc;
- }
-
- public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
- if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
- throw new IllegalArgumentException();
- }
-
- if (streamingMap.containsKey(streamingConfig.getName()))
- throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
-
- String path = StreamingConfig.concatResourcePath(streamingConfig.getName());
- getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
- streamingMap.put(streamingConfig.getName(), streamingConfig);
- return streamingConfig;
- }
-
- private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
- ResourceStore store = getStore();
- StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
-
- if (StringUtils.isBlank(streamingDesc.getName())) {
- throw new IllegalStateException("StreamingConfig name must not be blank");
- }
- return streamingDesc;
- }
-
- private void reloadAllStreaming() throws IOException {
- ResourceStore store = getStore();
- logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
-
- streamingMap.clear();
-
- List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
- for (String path : paths) {
- StreamingConfig streamingConfig;
- try {
- streamingConfig = loadStreamingConfigAt(path);
- } catch (Exception e) {
- logger.error("Error loading streaming desc " + path, e);
- continue;
- }
- if (path.equals(streamingConfig.getResourcePath()) == false) {
- logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
- continue;
- }
- if (streamingMap.containsKey(streamingConfig.getName())) {
- logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
- continue;
- }
-
- streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
- }
-
- logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
deleted file mode 100644
index 32030ad..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.cli;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class MonitorCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
-
- public static void main(String[] args) {
- Preconditions.checkArgument(args[0].equals("monitor"));
-
- int i = 1;
- List<String> receivers = null;
- String host = null;
- String tableName = null;
- String authorization = null;
- String cubeName = null;
- String projectName = "default";
- while (i < args.length) {
- String argName = args[i];
- switch (argName) {
- case "-receivers":
- receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
- break;
- case "-host":
- host = args[++i];
- break;
- case "-tableName":
- tableName = args[++i];
- break;
- case "-authorization":
- authorization = args[++i];
- break;
- case "-cubeName":
- cubeName = args[++i];
- break;
- case "-projectName":
- projectName = args[++i];
- break;
- default:
- throw new RuntimeException("invalid argName:" + argName);
- }
- i++;
- }
- Preconditions.checkArgument(receivers != null && receivers.size() > 0);
- final StreamingMonitor streamingMonitor = new StreamingMonitor();
- if (tableName != null) {
- logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
- Preconditions.checkNotNull(host);
- Preconditions.checkNotNull(authorization);
- Preconditions.checkNotNull(tableName);
- streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
- }
- if (cubeName != null) {
- logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
- streamingMonitor.checkCube(receivers, cubeName, host);
- }
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
deleted file mode 100644
index 1d66b41..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.cli;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.BootstrapConfig;
-import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
-import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class StreamingCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
-
- public static void main(String[] args) {
- try {
- Preconditions.checkArgument(args[0].equals("streaming"));
- Preconditions.checkArgument(args[1].equals("start"));
-
- int i = 2;
- BootstrapConfig bootstrapConfig = new BootstrapConfig();
- while (i < args.length) {
- String argName = args[i];
- switch (argName) {
- case "-start":
- bootstrapConfig.setStart(Long.parseLong(args[++i]));
- break;
- case "-end":
- bootstrapConfig.setEnd(Long.parseLong(args[++i]));
- break;
- case "-cube":
- bootstrapConfig.setCubeName(args[++i]);
- break;
- case "-fillGap":
- bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
- break;
- case "-maxFillGapRange":
- bootstrapConfig.setMaxFillGapRange(Long.parseLong(args[++i]));
- break;
- default:
- logger.warn("ignore this arg:" + argName);
- }
- i++;
- }
- if (bootstrapConfig.isFillGap()) {
- final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(bootstrapConfig.getCubeName());
- logger.info("all gaps:" + StringUtils.join(gaps, ","));
- for (Pair<Long, Long> gap : gaps) {
- List<Pair<Long, Long>> splitGaps = splitGap(gap, bootstrapConfig.getMaxFillGapRange());
- for (Pair<Long, Long> splitGap : splitGaps) {
- logger.info("start filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
- startOneOffCubeStreaming(bootstrapConfig.getCubeName(), splitGap.getFirst(), splitGap.getSecond());
- logger.info("finish filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
- }
- }
- } else {
- startOneOffCubeStreaming(bootstrapConfig.getCubeName(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
- logger.info("streaming process finished, exit with 0");
- System.exit(0);
- }
- } catch (Exception e) {
- printArgsError(args);
- logger.error("error start streaming", e);
- System.exit(-1);
- }
- }
-
- private static List<Pair<Long, Long>> splitGap(Pair<Long, Long> gap, long maxFillGapRange) {
- List<Pair<Long, Long>> gaps = Lists.newArrayList();
- Long startTime = gap.getFirst();
-
- while (startTime < gap.getSecond()) {
- Long endTime = gap.getSecond() <= startTime + maxFillGapRange ? gap.getSecond() : startTime + maxFillGapRange;
- gaps.add(Pair.newPair(startTime, endTime));
- startTime = endTime;
- }
-
- return gaps;
- }
-
- private static void startOneOffCubeStreaming(String cubeName, long start, long end) {
- final Runnable runnable = new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, end).build();
- runnable.run();
- }
-
- private static void printArgsError(String[] args) {
- logger.warn("invalid args:" + StringUtils.join(args, " "));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
deleted file mode 100644
index 350a5f8..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.cube;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.Nullable;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
-import org.apache.kylin.cube.util.CubingUtils;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingCubeBuilder implements StreamingBatchBuilder {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingCubeBuilder.class);
-
- private final String cubeName;
- private int processedRowCount = 0;
-
- public StreamingCubeBuilder(String cubeName) {
- this.cubeName = cubeName;
- }
-
- @Override
- public void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> dictionaryMap, ICuboidWriter cuboidWriter) {
- try {
- CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
- final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
-
- LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>();
- InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
- final Future<?> future = Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, cuboidWriter));
- processedRowCount = streamingBatch.getMessages().size();
- for (StreamingMessage streamingMessage : streamingBatch.getMessages()) {
- blockingQueue.put(streamingMessage.getData());
- }
- blockingQueue.put(Collections.<String> emptyList());
- future.get();
- cuboidWriter.flush();
-
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
- } catch (IOException e) {
- throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
- } finally {
- try {
- cuboidWriter.close();
- } catch (IOException e) {
- throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
- }
- }
- }
-
- @Override
- public IBuildable createBuildable(StreamingBatch streamingBatch) {
- CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
- try {
- CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond());
- segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
- segment.setInputRecords(streamingBatch.getMessages().size());
- segment.setLastBuildTime(System.currentTimeMillis());
- return segment;
- } catch (IOException e) {
- throw new RuntimeException("failed to create IBuildable", e);
- }
- }
-
- @Override
- public Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch) {
- final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
- final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
- long start = System.currentTimeMillis();
-
- final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), flatDesc, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
- @Nullable
- @Override
- public List<String> apply(@Nullable StreamingMessage input) {
- return input.getData();
- }
- }));
- logger.info(String.format("sampling of %d messages cost %d ms", streamingBatch.getMessages().size(), (System.currentTimeMillis() - start)));
- return samplingResult;
- }
-
- @Override
- public Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable buildable) {
- final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
- final Map<TblColRef, Dictionary<String>> dictionaryMap;
- try {
- dictionaryMap = CubingUtils.buildDictionary(cubeInstance, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
- @Nullable
- @Override
- public List<String> apply(@Nullable StreamingMessage input) {
- return input.getData();
- }
- }));
- Map<TblColRef, Dictionary<String>> realDictMap = CubingUtils.writeDictionary((CubeSegment) buildable, dictionaryMap, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond());
- return realDictMap;
- } catch (IOException e) {
- throw new RuntimeException("failed to build dictionary", e);
- }
- }
-
- @Override
- public void commit(IBuildable buildable) {
- CubeSegment cubeSegment = (CubeSegment) buildable;
- cubeSegment.setStatus(SegmentStatusEnum.READY);
- cubeSegment.setInputRecords(processedRowCount);
- CubeUpdate cubeBuilder = new CubeUpdate(cubeSegment.getCubeInstance());
- cubeBuilder.setToUpdateSegs(cubeSegment);
- try {
- CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).updateCube(cubeBuilder);
- } catch (IOException e) {
- throw new RuntimeException("failed to update CubeSegment", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
deleted file mode 100644
index fba664d..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.diagnose;
-
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.io.FileUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class StreamingLogAnalyzer {
- public static void main(String[] args) {
- int errorFileCount = 0;
- List<Long> ellapsedTimes = Lists.newArrayList();
-
- String patternStr = "(\\d{2}/\\d{2}/\\d{2} \\d{2}:\\d{2}:\\d{2})";
- Pattern pattern = Pattern.compile(patternStr);
-
- SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
- format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly
-
- Preconditions.checkArgument(args.length == 1, "Usage: StreamingLogsAnalyser streaming_logs_folder");
- for (File file : FileUtils.listFiles(new File(args[0]), new String[] { "log" }, false)) {
- System.out.println("Processing file " + file.toString());
-
- long startTime = 0;
- long endTime = 0;
- try {
- List<String> contents = Files.readAllLines(file.toPath(), Charset.defaultCharset());
- for (int i = 0; i < contents.size(); ++i) {
- Matcher m = pattern.matcher(contents.get(i));
- if (m.find()) {
- startTime = format.parse("20" + m.group(1)).getTime();
- break;
- }
- }
-
- for (int i = contents.size() - 1; i >= 0; --i) {
- Matcher m = pattern.matcher(contents.get(i));
- if (m.find()) {
- endTime = format.parse("20" + m.group(1)).getTime();
- break;
- }
- }
-
- if (startTime == 0 || endTime == 0) {
- throw new RuntimeException("start time or end time is not found");
- }
-
- if (endTime - startTime < 60000) {
- System.out.println("Warning: this job took less than one minute!!!! " + file.toString());
- }
-
- ellapsedTimes.add(endTime - startTime);
-
- } catch (Exception e) {
- System.out.println("Exception when processing log file " + file.toString());
- System.out.println(e);
- errorFileCount++;
- }
- }
-
- System.out.println("Totally error files count " + errorFileCount);
- System.out.println("Totally normal files processed " + ellapsedTimes.size());
-
- long sum = 0;
- for (Long x : ellapsedTimes) {
- sum += x;
- }
- System.out.println("Avg build time " + (sum / ellapsedTimes.size()));
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
deleted file mode 100644
index 55252c4..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.monitor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingMonitor {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
-
- public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
- String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
- StringBuilder stringBuilder = new StringBuilder();
- String url = host + "/kylin/api/query";
- PostMethod request = new PostMethod(url);
- try {
-
- request.addRequestHeader("Authorization", "Basic " + authorization);
- request.addRequestHeader("Content-Type", "application/json");
- String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
- request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
-
- int statusCode = new HttpClient().executeMethod(request);
- String msg = Bytes.toString(request.getResponseBody());
- stringBuilder.append("host:").append(host).append("\n");
- stringBuilder.append("query:").append(query).append("\n");
- stringBuilder.append("statusCode:").append(statusCode).append("\n");
- if (statusCode == 200) {
- title += "succeed";
- final HashMap<?, ?> hashMap = JsonUtil.readValue(msg, HashMap.class);
- stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
- stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
- } else {
- title += "failed";
- stringBuilder.append("response:").append(msg).append("\n");
- }
- } catch (Exception e) {
- final StringWriter out = new StringWriter();
- e.printStackTrace(new PrintWriter(out));
- title += "failed";
- stringBuilder.append(out.toString());
- } finally {
- request.releaseConnection();
- }
- logger.info("title:" + title);
- logger.info("content:" + stringBuilder.toString());
- sendMail(receivers, title, stringBuilder.toString());
- }
-
- public static final List<Pair<Long, Long>> findGaps(String cubeName) {
- List<CubeSegment> segments = getSortedReadySegments(cubeName);
- List<Pair<Long, Long>> gaps = Lists.newArrayList();
- for (int i = 0; i < segments.size() - 1; ++i) {
- CubeSegment first = segments.get(i);
- CubeSegment second = segments.get(i + 1);
- if (first.getDateRangeEnd() == second.getDateRangeStart()) {
- continue;
- } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
- gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
- }
- }
- return gaps;
- }
-
- private static List<CubeSegment> getSortedReadySegments(String cubeName) {
- final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
- Preconditions.checkNotNull(cube);
- final List<CubeSegment> segments = cube.getSegments(SegmentStatusEnum.READY);
- logger.info("totally " + segments.size() + " cubeSegments");
- Collections.sort(segments);
- return segments;
- }
-
- public static final List<Pair<String, String>> findOverlaps(String cubeName) {
- List<CubeSegment> segments = getSortedReadySegments(cubeName);
- List<Pair<String, String>> overlaps = Lists.newArrayList();
- for (int i = 0; i < segments.size() - 1; ++i) {
- CubeSegment first = segments.get(i);
- CubeSegment second = segments.get(i + 1);
- if (first.getDateRangeEnd() == second.getDateRangeStart()) {
- continue;
- } else {
- overlaps.add(Pair.newPair(first.getName(), second.getName()));
- }
- }
- return overlaps;
- }
-
- public void checkCube(List<String> receivers, String cubeName, String host) {
- final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
- if (cube == null) {
- logger.info("cube:" + cubeName + " does not exist");
- return;
- }
- List<Pair<Long, Long>> gaps = findGaps(cubeName);
- List<Pair<String, String>> overlaps = Lists.newArrayList();
- StringBuilder content = new StringBuilder();
- if (!gaps.isEmpty()) {
- content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
- @Nullable
- @Override
- public String apply(Pair<Long, Long> input) {
- return parseInterval(input);
- }
- }), "\n")).append("\n");
- }
- if (!overlaps.isEmpty()) {
- content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
- }
- if (content.length() > 0) {
- logger.info(content.toString());
- sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
- } else {
- logger.info("no gaps or overlaps");
- }
- }
-
- private String parseInterval(Pair<Long, Long> interval) {
- return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
- }
-
- private void sendMail(List<String> receivers, String title, String content) {
- final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
- mailService.sendMail(receivers, title, content, false);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
deleted file mode 100644
index 5790bc1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.util;
-
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.IStreamingOutput;
-import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-import com.google.common.base.Preconditions;
-
-/**
- * TODO: like MRUtil, use Factory pattern to allow config
- */
-public class StreamingUtils {
-
- public static IStreamingInput getStreamingInput() {
- return (IStreamingInput) ClassUtil.newInstance("org.apache.kylin.source.kafka.KafkaStreamingInput");
- }
-
- public static IStreamingOutput getStreamingOutput() {
- return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput");
- }
-
- public static StreamingBatchBuilder getMicroBatchBuilder(RealizationType realizationType, String realizationName) {
- Preconditions.checkNotNull(realizationName);
- if (realizationType == RealizationType.CUBE) {
- return new StreamingCubeBuilder(realizationName);
- } else {
- throw new UnsupportedOperationException("not implemented yet");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index c30abc0..a47fcde 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -44,8 +44,8 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 09ef0e8..72e4069 100644
--- a/pom.xml
+++ b/pom.xml
@@ -225,11 +225,6 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-engine-streaming</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-spark</artifactId>
<version>${project.version}</version>
</dependency>
@@ -1017,7 +1012,6 @@
<module>core-job</module>
<module>core-storage</module>
<module>engine-mr</module>
- <module>engine-streaming</module>
<module>engine-spark</module>
<module>source-hive</module>
<module>source-kafka</module>
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index f3374c3..a5fb874 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.rest.exception.BadRequestException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index c4af5f4..34cc57f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -32,7 +32,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
index abf0638..170c395 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -29,7 +29,7 @@ import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index e49e882..7310d9c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.springframework.beans.factory.annotation.Autowired;
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index d4cdfd5..e2100c4 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -43,12 +43,6 @@
<artifactId>kylin-core-common</artifactId>
</dependency>
-
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-engine-streaming</artifactId>
- </dependency>
-
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
deleted file mode 100644
index 6981096..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- */
-class ByteBufferBackedInputStream extends InputStream {
-
- private ByteBuffer buf;
-
- public ByteBufferBackedInputStream(ByteBuffer buf) {
- this.buf = buf;
- }
-
- @Override
- public int read() throws IOException {
- if (!buf.hasRemaining()) {
- return -1;
- }
- return buf.get() & 0xFF;
- }
-
- @Override
- public int read(byte[] bytes, int off, int len) throws IOException {
- if (!buf.hasRemaining()) {
- return -1;
- }
-
- len = Math.min(len, buf.remaining());
- buf.get(bytes, off, len);
- return len;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index d039583..208c0ce 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -20,7 +20,7 @@ package org.apache.kylin.source.kafka;
import com.google.common.collect.Lists;
import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ReadableTable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
deleted file mode 100644
index 78a67c2..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.google.common.base.Function;
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.util.KafkaRequester;
-import org.apache.kylin.source.kafka.util.KafkaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-
-import javax.annotation.Nullable;
-
-@SuppressWarnings("unused")
-public class KafkaStreamingInput implements IStreamingInput {
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingInput.class);
-
- @Override
- public StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime) {
- if (realizationType != RealizationType.CUBE) {
- throw new IllegalArgumentException("Unsupported realization in KafkaStreamingInput: " + realizationType);
- }
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(realizationName);
- final String streaming = cube.getFactTable();
- final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
- final StreamingConfig streamingConfig = streamingManager.getConfig(streaming);
- if (streamingConfig == null) {
- throw new IllegalArgumentException("Table " + streaming + " is not a streaming table.");
- }
- if (StreamingConfig.STREAMING_TYPE_KAFKA.equals(streamingConfig.getType())) {
- logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime));
-
- try {
- final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
- final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
- List<TblColRef> columns = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns();
-
- final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
- final ExecutorService executorService = Executors.newCachedThreadPool();
- final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
- for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
-
- final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
- for (int i = 0; i < partitionCount; ++i) {
- final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser);
- final Future<List<StreamingMessage>> future = executorService.submit(producer);
- futures.add(future);
- }
- }
- List<StreamingMessage> messages = Lists.newLinkedList();
- for (Future<List<StreamingMessage>> future : futures) {
- try {
- messages.addAll(future.get());
- } catch (InterruptedException e) {
- logger.warn("this thread should not be interrupted, just ignore", e);
- continue;
- } catch (ExecutionException e) {
- throw new RuntimeException("error when get StreamingMessages", e.getCause());
- }
- }
- final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
- logger.info("finish to get streaming batch, total message count:" + messages.size());
- return new StreamingBatch(messages, timeRange);
- } catch (ReflectiveOperationException e) {
- throw new RuntimeException("failed to create instance of StreamingParser", e);
- }
- } else {
- throw new IllegalArgumentException("kafka is the only supported streaming type.");
- }
- }
-
- private static class StreamingMessageProducer implements Callable<List<StreamingMessage>> {
-
- private final KafkaClusterConfig kafkaClusterConfig;
- private final int partitionId;
- private final StreamingParser streamingParser;
- private final Pair<Long, Long> timeRange;
- private final long margin;
-
- private List<Broker> replicaBrokers;
-
- StreamingMessageProducer(KafkaClusterConfig kafkaClusterConfig, int partitionId, Pair<Long, Long> timeRange, long margin, StreamingParser streamingParser) {
- this.kafkaClusterConfig = kafkaClusterConfig;
- this.partitionId = partitionId;
- this.streamingParser = streamingParser;
- this.margin = margin;
- this.timeRange = timeRange;
- this.replicaBrokers = kafkaClusterConfig.getBrokers();
- }
-
- private Broker getLeadBroker() {
- final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig);
- if (partitionMetadata != null) {
- if (partitionMetadata.errorCode() != 0) {
- logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
- }
- replicaBrokers = Lists.transform(partitionMetadata.replicas(), new Function<BrokerEndPoint, Broker>() {
- @Nullable
- @Override
- public Broker apply(@Nullable BrokerEndPoint brokerEndPoint) {
- return new Broker(brokerEndPoint, SecurityProtocol.PLAINTEXT);
- }
- });
- BrokerEndPoint leaderEndpoint = partitionMetadata.leader();
-
- return new Broker(leaderEndpoint, SecurityProtocol.PLAINTEXT);
- } else {
- return null;
- }
- }
-
- @Override
- public List<StreamingMessage> call() throws Exception {
- List<StreamingMessage> result = Lists.newLinkedList();
- try {
- long startTimestamp = timeRange.getFirst() - margin;
- long offset = KafkaUtils.findClosestOffsetWithDataTimestamp(kafkaClusterConfig, partitionId, startTimestamp, streamingParser);
- int fetchRound = 0;
- int consumeMsgCount = 0;
- Broker leadBroker = null;
- String topic = kafkaClusterConfig.getTopic();
- while (true) {
- boolean outOfMargin = false;
- int consumeMsgCountAtBeginning = consumeMsgCount;
- fetchRound++;
-
- if (leadBroker == null) {
- leadBroker = getLeadBroker();
- }
-
- if (leadBroker == null) {
- logger.warn("cannot find lead broker, wait 5s");
- Thread.sleep(5000);
- continue;
- }
-
- logger.info("fetching topic {} partition id {} offset {} leader {}", topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString());
-
- final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
- if (fetchResponse.errorCode(topic, partitionId) != 0) {
- logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
- Thread.sleep(30000);
- continue;
- }
-
- for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
- offset++;
- consumeMsgCount++;
- final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
- streamingMessage.setOffset(messageAndOffset.offset());
- if (streamingParser.filter(streamingMessage)) {
- final long timestamp = streamingMessage.getTimestamp();
- if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) {
- result.add(streamingMessage);
- } else if (timestamp < timeRange.getSecond() + margin) {
- //do nothing
- } else {
- logger.info("thread:" + Thread.currentThread() + " message timestamp:" + timestamp + " is out of time range:" + timeRange + " margin:" + margin);
- outOfMargin = true;
- break;
- }
- }
- }
- logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound);
- if (outOfMargin) {
- break;
- }
- if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round
- logger.info("no message consumed this round, wait 30s");
- Thread.sleep(30000);
- }
- }
- } catch (InterruptedException e) {
- logger.warn("this thread should not be interrupted, just stop fetching", e);
- }
- return result;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index e4c702d..633a30c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.util.ByteBufferBackedInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
deleted file mode 100644
index b1b4011..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.DaemonThreadFactory;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.StreamingParser;
-import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.util.KafkaRequester;
-import org.apache.kylin.source.kafka.util.KafkaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.message.MessageAndOffset;
-
-/**
- * Continuously run this as a daemon to discover how "disordered" the kafka queue is.
- * This daemon only store a digest so it should not be space-consuming
- */
-public class KafkaInputAnalyzer extends AbstractApplication {
-
- public class KafkaMessagePuller implements Runnable {
-
- private final String topic;
- private final int partitionId;
- private final KafkaClusterConfig streamingConfig;
- private final LinkedBlockingQueue<StreamingMessage> streamQueue;
- private final StreamingParser streamingParser;
- private final Broker leadBroker;
- private long offset;
-
- protected final Logger logger;
-
- public KafkaMessagePuller(int clusterID, String topic, int partitionId, long startOffset, Broker leadBroker, KafkaClusterConfig kafkaClusterConfig, StreamingParser streamingParser) {
- this.topic = topic;
- this.partitionId = partitionId;
- this.streamingConfig = kafkaClusterConfig;
- this.offset = startOffset;
- this.logger = LoggerFactory.getLogger(topic + "_cluster_" + clusterID + "_" + partitionId);
- this.streamQueue = new LinkedBlockingQueue<StreamingMessage>(10000);
- this.streamingParser = streamingParser;
- this.leadBroker = leadBroker;
- }
-
- public BlockingQueue<StreamingMessage> getStreamQueue() {
- return streamQueue;
- }
-
- @Override
- public void run() {
- try {
- int consumeMsgCount = 0;
- int fetchRound = 0;
- while (true) {
- int consumeMsgCountAtBeginning = consumeMsgCount;
- fetchRound++;
-
- logger.info("fetching topic {} partition id {} offset {} leader {}", topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString());
-
- final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, streamingConfig);
- if (fetchResponse.errorCode(topic, partitionId) != 0) {
- logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
- Thread.sleep(30000);
- continue;
- }
-
- for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
- offset++;
- consumeMsgCount++;
-
- final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
- streamingMessage.setOffset(messageAndOffset.offset());
- if (streamingParser.filter(streamingMessage)) {
- streamQueue.add(streamingMessage);
- }
-
- }
- logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound);
-
- if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round
- Thread.sleep(30000);
- }
- }
- } catch (Exception e) {
- logger.error("consumer has encountered an error", e);
- }
- }
-
- }
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_STREAMING = OptionBuilder.withArgName("streaming").hasArg().isRequired(true).withDescription("Name of the streaming").create("streaming");
- @SuppressWarnings("static-access")
- private static final Option OPTION_TASK = OptionBuilder.withArgName("task").hasArg().isRequired(true).withDescription("get delay or get disorder degree").create("task");
- @SuppressWarnings("static-access")
- private static final Option OPTION_TSCOLNAME = OptionBuilder.withArgName("tsColName").hasArg().isRequired(true).withDescription("field name of the ts").create("tsColName");
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaInputAnalyzer.class);
-
- private StreamingParser parser;
- private KafkaConfig kafkaConfig;
-
- private Options options;
-
- public KafkaInputAnalyzer() {
- options = new Options();
- options.addOption(OPTION_STREAMING);
- options.addOption(OPTION_TASK);
- options.addOption(OPTION_TSCOLNAME);
-
- }
-
- private List<BlockingQueue<StreamingMessage>> consume(final int clusterID, final KafkaClusterConfig kafkaClusterConfig, final int partitionCount, long whichtime) {
- List<BlockingQueue<StreamingMessage>> result = Lists.newArrayList();
- for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
- final kafka.cluster.Broker leadBroker = KafkaUtils.getLeadBroker(kafkaClusterConfig, partitionId);
- long streamingOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, whichtime, leadBroker, kafkaClusterConfig);
- logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId);
- KafkaMessagePuller consumer = new KafkaMessagePuller(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, leadBroker, kafkaClusterConfig, parser);
- Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer);
- result.add(consumer.getStreamQueue());
- }
- return result;
- }
-
- private List<BlockingQueue<StreamingMessage>> consumeAll(long whichtime) {
- int clusterId = 0;
- final List<BlockingQueue<StreamingMessage>> queues = Lists.newLinkedList();
-
- for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
- final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
- final List<BlockingQueue<StreamingMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, whichtime);
- queues.addAll(oneClusterQueue);
- logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
- clusterId++;
- }
- return queues;
- }
-
- private void analyzeLatency() throws InterruptedException {
- long[] intervals = new long[] { 1, 5, 60, 300, 1800 };
- final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.LatestTime());
- final List<TimeHistogram> allHistograms = Lists.newArrayList();
- final TimeHistogram overallHistogram = new TimeHistogram(intervals, "overall");
-
- ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
- for (int i = 0; i < allPartitionData.size(); ++i) {
- final int index = i;
- allHistograms.add(new TimeHistogram(intervals, "" + i));
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- while (true) {
- try {
- StreamingMessage message = allPartitionData.get(index).take();
- long t = message.getTimestamp();
- allHistograms.get(index).processMillis(System.currentTimeMillis() - t);
- overallHistogram.processMillis(System.currentTimeMillis() - t);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
- }
-
- while (true) {
- System.out.println("Printing status at : " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime()));
-
- for (TimeHistogram histogram : allHistograms) {
- histogram.printStatus();
- }
- overallHistogram.printStatus();
- Thread.sleep(300000);
- }
- }
-
- private void analyzeDisorder() throws InterruptedException {
- final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.EarliestTime());
-
- final List<Long> wallClocks = Lists.newArrayList();
- final List<Long> wallOffset = Lists.newArrayList();
- final List<Long> maxDisorderTime = Lists.newArrayList();
- final List<Long> maxDisorderOffset = Lists.newArrayList();
- final List<Long> processedMessages = Lists.newArrayList();
-
- for (int i = 0; i < allPartitionData.size(); i++) {
- wallClocks.add(0L);
- wallOffset.add(0L);
- maxDisorderTime.add(0L);
- maxDisorderOffset.add(0L);
- processedMessages.add(0L);
- }
-
- ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
- final CountDownLatch countDownLatch = new CountDownLatch(allPartitionData.size());
- for (int i = 0; i < allPartitionData.size(); ++i) {
- final int index = i;
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- while (true) {
- StreamingMessage message = allPartitionData.get(index).poll(60, TimeUnit.SECONDS);
- if (message == null) {
- System.out.println(String.format("Thread %d is exiting", index));
- break;
- }
- long t = message.getTimestamp();
- long offset = message.getOffset();
- if (t < wallClocks.get(index)) {
- maxDisorderTime.set(index, Math.max(wallClocks.get(index) - t, maxDisorderTime.get(index)));
- maxDisorderOffset.set(index, Math.max(offset - wallOffset.get(index), maxDisorderOffset.get(index)));
- } else {
- wallClocks.set(index, t);
- wallOffset.set(index, offset);
- }
- processedMessages.set(index, processedMessages.get(index) + 1);
-
- if (processedMessages.get(index) % 10000 == 1) {
- System.out.println(String.format("Thread %d processed %d messages. Max disorder time is %d , max disorder offset is %d", //
- index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
- }
- }
-
- System.out.println(String.format("Thread %d finishes after %d messages. Max disorder time is %d , max disorder offset is %d", //
- index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
- countDownLatch.countDown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- countDownLatch.await();
- }
-
- @Override
- protected Options getOptions() {
- return options;
- }
-
- @Override
- protected void execute(OptionsHelper optionsHelper) throws Exception {
-
- String streaming = optionsHelper.getOptionValue(OPTION_STREAMING);
- String task = optionsHelper.getOptionValue(OPTION_TASK);
- String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME);
-
- Map<String, String> properties = Maps.newHashMap();
- properties.put(StreamingParser.PROPERTY_TS_COLUMN_NAME, tsColName);
- kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming);
- parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), properties);
-
- if ("disorder".equalsIgnoreCase(task)) {
- analyzeDisorder();
- } else if ("delay".equalsIgnoreCase(task)) {
- analyzeLatency();
- } else {
- optionsHelper.printUsage(this.getClass().getName(), options);
- }
- }
-
- public static void main(String[] args) {
- KafkaInputAnalyzer analyzer = new KafkaInputAnalyzer();
- analyzer.execute(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
deleted file mode 100644
index 6a456bc..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-
-/**
- * only for verify kylin streaming's correctness by comparing to data in original kafka topic
- */
-public class KafkaVerify {
-
- public static void main(String[] args) throws IOException {
-
- System.out.println("start");
-
- ObjectMapper mapper = new ObjectMapper();
- JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
-
- long start = Long.valueOf(args[0]);
- long end = Long.valueOf(args[1]);
- long interval = Long.valueOf(args[2]);
- int bucket = (int) ((end - start + interval - 1) / interval);
-
- long[] qtySum = new long[bucket];
- long qtyTotal = 0;
- long[] counts = new long[bucket];
- long countTotal = 0;
- long processed = 0;
- long minOffset = -1;
- long maxOffset = -1;
-
- try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) {
- String s;
- while ((s = br.readLine()) != null) {
- // process the line.
- if (++processed % 10000 == 1) {
- System.out.println("processing " + processed);
- }
-
- Map<String, String> root = mapper.readValue(s, mapType);
- String tsStr = root.get("sys_ts");
-
- if (StringUtils.isEmpty(tsStr)) {
- continue;
- }
- long ts = Long.valueOf(tsStr);
- if (ts < start || ts >= end) {
- continue;
- }
-
- if (minOffset == -1) {
- minOffset = processed - 1;
- }
- maxOffset = processed - 1;
-
- long qty = Long.valueOf(root.get("qty"));
- int index = (int) ((ts - start) / interval);
- qtySum[index] += qty;
- qtyTotal += qty;
- counts[index]++;
- countTotal++;
- }
- }
-
- System.out.println("qty sum is " + Arrays.toString(qtySum));
- System.out.println("qty total is " + qtyTotal);
- System.out.println("count is " + Arrays.toString(counts));
- System.out.println("count total is " + countTotal);
- System.out.println("first processed is " + minOffset);
- System.out.println("last processed is " + maxOffset);
- }
-}