You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/05 06:35:49 UTC
[1/3] incubator-kylin git commit: add support for commit offset for
multi partitions
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 dda27de23 -> 4f7b1bffa
add support for commit offset for multi partitions
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6133d735
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6133d735
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6133d735
Branch: refs/heads/0.8.0
Commit: 6133d7356426cf2229ac0d1122227ddcf2c7a3bc
Parents: dda27de
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jun 4 18:14:56 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 11:56:23 2015 +0800
----------------------------------------------------------------------
.../kylin/streaming/StreamingManager.java | 48 ++++++++++++++++++++
.../kylin/streaming/StreamingManagerTest.java | 37 +++++++++++++++
2 files changed, 85 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6133d735/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
index f025216..3552edc 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
@@ -34,13 +34,23 @@
package org.apache.kylin.streaming;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -95,6 +105,10 @@ public class StreamingManager {
return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
}
+ private String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
+ return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
+ }
+
public boolean createOrUpdateKafkaConfig(String name, StreamingConfig config) {
try {
@@ -142,4 +156,38 @@ public class StreamingManager {
}
}
+ public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions) {
+ Collections.sort(partitions);
+ final String resPath = formatStreamingOutputPath(streaming, partitions);
+ try {
+ final InputStream inputStream = getStore().getResource(resPath);
+ if (inputStream == null) {
+ return Collections.emptyMap();
+ }
+ final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType);
+ return result;
+ } catch (IOException e) {
+ logger.error("error get offset, path:" + resPath, e);
+ throw new RuntimeException("error get offset, path:" + resPath, e);
+ }
+ }
+
+ public void updateOffset(String streaming, HashMap<Integer, Long> offset) {
+ List<Integer> partitions = Lists.newLinkedList(offset.keySet());
+ Collections.sort(partitions);
+ final String resPath = formatStreamingOutputPath(streaming, partitions);
+ try {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ mapper.writeValue(baos, offset);
+ getStore().putResource(resPath, new ByteArrayInputStream(baos.toByteArray()), getStore().getResourceTimestamp(resPath));
+ } catch (IOException e) {
+ logger.error("error update offset, path:" + resPath, e);
+ throw new RuntimeException("error update offset, path:" + resPath, e);
+ }
+ }
+
+ private final ObjectMapper mapper = new ObjectMapper();
+ private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6133d735/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
index 772643b..7a53060 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
@@ -34,12 +34,20 @@
package org.apache.kylin.streaming;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.TestCase;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
import static junit.framework.Assert.assertEquals;
import static junit.framework.TestCase.assertNotNull;
import static org.junit.Assert.fail;
@@ -82,11 +90,40 @@ public class StreamingManagerTest extends LocalFileMetadataTestCase {
updateOffsetAndCompare(streaming, partition, 1000);
updateOffsetAndCompare(streaming, partition, 800);
updateOffsetAndCompare(streaming, partition, 2000);
+ }
+
+ @Test
+ public void testMultiOffset() {
+ final String streaming = "kafka_multi_test";
+ List<Integer> partitions = Lists.newArrayList(Lists.asList(0, 1, new Integer[]{2, 3}));
+ assertEquals(0, streamingManager.getOffset("kafka_multi_test", partitions).size());
+ for (int i = 0; i < 10; i++) {
+ updateOffsetAndCompare(streaming, generateRandomOffset(partitions));
+ }
+ }
+
+ private HashMap<Integer, Long> generateRandomOffset(List<Integer> partitions) {
+ final HashMap<Integer, Long> result = Maps.newHashMap();
+ final Random random = new Random();
+ for (Integer partition : partitions) {
+ result.put(partition, random.nextLong());
+ }
+ return result;
}
private void updateOffsetAndCompare(String streaming, int partition, long offset) {
streamingManager.updateOffset(streaming, partition, offset);
assertEquals(offset, streamingManager.getOffset(streaming, partition));
}
+
+ private void updateOffsetAndCompare(String streaming, HashMap<Integer, Long> offset) {
+ streamingManager.updateOffset(streaming, offset);
+ final Map<Integer, Long> result = streamingManager.getOffset(streaming, Lists.newLinkedList(offset.keySet()));
+ System.out.println(result);
+ assertEquals(offset.size(), result.size());
+ for (Integer partition : result.keySet()) {
+ assertEquals(offset.get(partition), result.get(partition));
+ }
+ }
}
[2/3] incubator-kylin git commit: refactor
Posted by qh...@apache.org.
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/11e9d3e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/11e9d3e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/11e9d3e9
Branch: refs/heads/0.8.0
Commit: 11e9d3e92d3e886006ee526a2b3581bcf00f2bfe
Parents: 6133d73
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jun 4 20:29:30 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 11:57:01 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/StreamingBootstrap.java | 38 +++++++++++++++-----
.../apache/kylin/job/BuildIIWithStreamTest.java | 3 +-
.../kylin/job/hadoop/invertedindex/IITest.java | 2 +-
.../job/streaming/CubeStreamConsumerTest.java | 2 +-
.../kylin/streaming/MicroStreamBatch.java | 10 +++++-
.../apache/kylin/streaming/StreamBuilder.java | 31 +++++++++++++---
6 files changed, 69 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index fd590d2..436c55b 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.lang.reflect.Constructor;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -135,16 +136,21 @@ public class StreamingBootstrap {
}
}
- private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount) {
+ private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount, final Map<Integer, Long> partitionIdOffsetMap, final int partitionOffset) {
List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
-
+ final int transferredPartitionId = partitionId + partitionOffset;
final long latestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
long streamingOffset = latestOffset;
- logger.info("submitting offset:" + streamingOffset);
-
- KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig, 1);
+ if (partitionIdOffsetMap.containsKey(transferredPartitionId)) {
+ final long earliestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
+ long committedOffset = partitionIdOffsetMap.get(transferredPartitionId);
+ Preconditions.checkArgument(committedOffset >= earliestOffset && committedOffset <= latestOffset, String.format("invalid offset:%d, earliestOffset:%d, latestOffset:%d", committedOffset, earliestOffset, latestOffset));
+ streamingOffset = committedOffset;
+ }
+ logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId + " transferredPartitionId:" + transferredPartitionId);
+ KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
Executors.newSingleThreadExecutor().submit(consumer);
result.add(consumer.getStreamQueue(0));
}
@@ -156,15 +162,28 @@ public class StreamingBootstrap {
final List<BlockingQueue<StreamMessage>> allClustersData = Lists.newArrayList();
+ ArrayList<Integer> allPartitions = Lists.newArrayList();
+ int partitionOffset = 0;
+ for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
+ final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+ for (int i = 0; i < partitionCount; i++) {
+ allPartitions.add(i + partitionOffset);
+ }
+ partitionOffset += partitionCount;
+ }
+ final Map<Integer, Long> partitionIdOffsetMap = streamingManager.getOffset(streamingConfig.getName(), allPartitions);
+
int clusterID = 0;
+ partitionOffset = 0;
for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
- final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount);
+ final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, partitionOffset);
logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
allClustersData.addAll(oneClusterData);
clusterID++;
+ partitionOffset += partitionCount;
}
final String cubeName = streamingConfig.getCubeName();
@@ -174,7 +193,7 @@ public class StreamingBootstrap {
MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
logger.info("batch time interval is {} to {}", DateFormat.formatToTimeStr(startTimestamp), DateFormat.formatToTimeStr(startTimestamp + batchInterval));
- StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
+ StreamBuilder cubeStreamBuilder = new StreamBuilder(streamingConfig.getName(), allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
@Nullable
@Override
@@ -242,7 +261,10 @@ public class StreamingBootstrap {
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
- final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism), new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE), new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
+ final StreamBuilder task = new StreamBuilder(streamingConfig.getName(),
+ consumer.getStreamQueue(i % parallelism),
+ new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
+ new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index e84b176..6d660e5 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -214,7 +214,8 @@ public class BuildIIWithStreamTest {
ToolRunner.run(new IICreateHTableJob(), args);
ExecutorService executorService = Executors.newSingleThreadExecutor();
- final StreamBuilder streamBuilder = new StreamBuilder(queue,
+ final StreamBuilder streamBuilder = new StreamBuilder(iiName,
+ queue,
new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE),
new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0),
0);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index b2fb5ea..9a28ea4 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -85,7 +85,7 @@ public class IITest extends LocalFileMetadataTestCase {
List<List<String>> parsedStreamMessages = Lists.newArrayList();
StreamParser parser = StringStreamParser.instance;
- MicroStreamBatch batch = new MicroStreamBatch();
+ MicroStreamBatch batch = new MicroStreamBatch(0);
for (StreamMessage message : streamMessages) {
ParsedStreamMessage parsedStreamMessage = parser.parse(message);
if ((parsedStreamMessage.isAccepted())) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
index 328ec72..ca0d037 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -64,7 +64,7 @@ public class CubeStreamConsumerTest {
@Test
public void test() throws Exception {
LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
- StreamBuilder cubeStreamBuilder = new StreamBuilder(queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
+ StreamBuilder cubeStreamBuilder = new StreamBuilder(CUBE_NAME, queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
loadDataFromLocalFile(queue, 100000);
future.get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
index e1ff60c..6ecd592 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -9,6 +9,8 @@ import java.util.List;
*/
public final class MicroStreamBatch {
+ private final int partitionId;
+
private final List<List<String>> streams;
private final Pair<Long, Long> timestamp;
@@ -17,18 +19,24 @@ public final class MicroStreamBatch {
private int rawMessageCount;
- public MicroStreamBatch() {
+ public MicroStreamBatch(int partitionId) {
+ this.partitionId = partitionId;
this.streams = Lists.newLinkedList();
this.timestamp = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
this.offset = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
}
private MicroStreamBatch(MicroStreamBatch batch) {
+ this.partitionId = batch.partitionId;
this.streams = Lists.newLinkedList(batch.streams);
this.timestamp = Pair.newPair(batch.timestamp.getFirst(), batch.timestamp.getSecond());
this.offset = Pair.newPair(batch.offset.getFirst(), batch.offset.getSecond());
}
+ public int getPartitionId() {
+ return partitionId;
+ }
+
public final List<List<String>> getStreams() {
return streams;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 8e5b0ca..cb8dfb1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -36,11 +36,14 @@ package org.apache.kylin.streaming;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.DateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.*;
@@ -59,8 +62,11 @@ public class StreamBuilder implements Runnable {
private final MicroBatchCondition condition;
- public StreamBuilder(List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+ private final String streaming;
+
+ public StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
Preconditions.checkArgument(inputs.size() > 0);
+ this.streaming = streaming;
this.streamMessageQueues = Lists.newArrayList();
this.consumer = Preconditions.checkNotNull(consumer);
this.condition = condition;
@@ -68,7 +74,8 @@ public class StreamBuilder implements Runnable {
init(inputs);
}
- public StreamBuilder(BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+ public StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+ this.streaming = streaming;
this.streamMessageQueues = Lists.newArrayList();
this.consumer = Preconditions.checkNotNull(consumer);
this.condition = condition;
@@ -90,11 +97,16 @@ public class StreamBuilder implements Runnable {
final int inputCount = streamMessageQueues.size();
final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
long start = startTimestamp;
+ List<Integer> partitions = Lists.newArrayList();
+ for (int i = 0, partitionCount = streamMessageQueues.size(); i < partitionCount; i++) {
+ partitions.add(i);
+ }
while (true) {
CountDownLatch countDownLatch = new CountDownLatch(inputCount);
ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
+ int partitionId = 0;
for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
- futures.add(executorService.submit(new StreamFetcher(streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
+ futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
}
countDownLatch.await();
ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -126,6 +138,13 @@ public class StreamBuilder implements Runnable {
long startTime = System.currentTimeMillis();
consumer.consume(batch);
logger.info("Batch build costs {} milliseconds", System.currentTimeMillis() - startTime);
+ if (batches.size() > 1) {
+ final HashMap<Integer, Long> offset = Maps.newHashMap();
+ for (MicroStreamBatch microStreamBatch : batches) {
+ offset.put(microStreamBatch.getPartitionId(), microStreamBatch.getOffset().getSecond());
+ }
+ StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).updateOffset(streaming, offset);
+ }
}
}
} catch (InterruptedException e) {
@@ -143,10 +162,12 @@ public class StreamBuilder implements Runnable {
private final BlockingQueue<StreamMessage> streamMessageQueue;
private final CountDownLatch countDownLatch;
+ private final int partitionId;
private long startTimestamp;
private long endTimestamp;
- public StreamFetcher(BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+ public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+ this.partitionId = partitionId;
this.streamMessageQueue = streamMessageQueue;
this.countDownLatch = countDownLatch;
this.startTimestamp = startTimestamp;
@@ -183,7 +204,7 @@ public class StreamBuilder implements Runnable {
MicroStreamBatch microStreamBatch = null;
while (true) {
if (microStreamBatch == null) {
- microStreamBatch = new MicroStreamBatch();
+ microStreamBatch = new MicroStreamBatch(partitionId);
clearCounter();
}
StreamMessage streamMessage = peek(streamMessageQueue, 30000);
[3/3] incubator-kylin git commit: refactor
Posted by qh...@apache.org.
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4f7b1bff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4f7b1bff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4f7b1bff
Branch: refs/heads/0.8.0
Commit: 4f7b1bffa31773e434258db7532d6eb0944b94fe
Parents: 11e9d3e
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jun 5 12:32:15 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 12:32:15 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/CubeStreamConsumer.java | 2 +
.../kylin/job/streaming/StreamingBootstrap.java | 10 +--
.../apache/kylin/job/BuildIIWithStreamTest.java | 7 +-
.../job/streaming/CubeStreamConsumerTest.java | 6 +-
.../apache/kylin/streaming/BatchCondition.java | 16 ++++
.../kylin/streaming/LimitedSizeCondition.java | 29 +++++++
.../kylin/streaming/MicroBatchCondition.java | 22 ------
.../apache/kylin/streaming/StreamBuilder.java | 82 +++++++++++++-------
.../kylin/streaming/TimePeriodCondition.java | 30 +++++++
9 files changed, 142 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index bd7c6cb..03dd92a 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -96,7 +96,9 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), microStreamBatch.getTimestamp().getSecond(), false, false);
+ long start = System.currentTimeMillis();
final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+ logger.info(String.format("sampling of %d messages cost %d ms", parsedStreamMessages.size(), (System.currentTimeMillis() - start)));
final Configuration conf = HadoopUtil.getCurrentConfiguration();
final Path outputPath = new Path("file:///tmp/cuboidstatistics/" + UUID.randomUUID().toString());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 436c55b..e28fa9d 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -190,10 +190,9 @@ public class StreamingBootstrap {
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
int batchInterval = 5 * 60 * 1000;
- MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
logger.info("batch time interval is {} to {}", DateFormat.formatToTimeStr(startTimestamp), DateFormat.formatToTimeStr(startTimestamp + batchInterval));
- StreamBuilder cubeStreamBuilder = new StreamBuilder(streamingConfig.getName(), allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
+ StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(streamingConfig.getName(), allClustersData, new CubeStreamConsumer(cubeName), startTimestamp, batchInterval);
cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
@Nullable
@Override
@@ -261,10 +260,11 @@ public class StreamingBootstrap {
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
- final StreamBuilder task = new StreamBuilder(streamingConfig.getName(),
+ final StreamBuilder task = StreamBuilder.newLimitedSizeStreamBuilder(streamingConfig.getName(),
consumer.getStreamQueue(i % parallelism),
- new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
- new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
+ new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i),
+ 0L,
+ iiDesc.getSliceSize());
task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 6d660e5..9693771 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -58,7 +58,6 @@ import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.streaming.MicroBatchCondition;
import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamMessage;
import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
@@ -214,11 +213,11 @@ public class BuildIIWithStreamTest {
ToolRunner.run(new IICreateHTableJob(), args);
ExecutorService executorService = Executors.newSingleThreadExecutor();
- final StreamBuilder streamBuilder = new StreamBuilder(iiName,
+ final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName,
queue,
- new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE),
new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0),
- 0);
+ 0,
+ segment.getIIDesc().getSliceSize());
List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
int count = sorted.size();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
index ca0d037..8e3eb61 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -1,5 +1,6 @@
package org.apache.kylin.job.streaming;
+import com.google.common.collect.Lists;
import org.apache.hadoop.util.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
@@ -10,7 +11,6 @@ import org.apache.kylin.cube.CubeBuilder;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.streaming.MicroBatchCondition;
import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamMessage;
import org.junit.Before;
@@ -64,7 +64,9 @@ public class CubeStreamConsumerTest {
@Test
public void test() throws Exception {
LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
- StreamBuilder cubeStreamBuilder = new StreamBuilder(CUBE_NAME, queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
+ List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+ queues.add(queue);
+ StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
loadDataFromLocalFile(queue, 100000);
future.get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
new file mode 100644
index 0000000..0fa11c1
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
@@ -0,0 +1,16 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public interface BatchCondition {
+
+ enum Result {
+ ACCEPT,
+ REJECT,
+ DISCARD
+ }
+
+ Result apply(ParsedStreamMessage message);
+
+ BatchCondition copy();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
new file mode 100644
index 0000000..3c1e367
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
@@ -0,0 +1,29 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class LimitedSizeCondition implements BatchCondition {
+
+ private final int limit;
+ private int count;
+
+ public LimitedSizeCondition(int limit) {
+ this.limit = limit;
+ this.count = 0;
+ }
+
+ @Override
+ public Result apply(ParsedStreamMessage message) {
+ if (count < limit) {
+ count++;
+ return Result.ACCEPT;
+ } else {
+ return Result.REJECT;
+ }
+ }
+
+ @Override
+ public BatchCondition copy() {
+ return new LimitedSizeCondition(this.limit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
deleted file mode 100644
index baf7b04..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.kylin.streaming;
-
-/**
- */
-public final class MicroBatchCondition {
-
- private final int batchSize;
- private final int batchInterval;
-
- public MicroBatchCondition(int batchSize, int batchInterval) {
- this.batchSize = batchSize;
- this.batchInterval = batchInterval;
- }
-
- public int getBatchSize() {
- return batchSize;
- }
-
- public int getBatchInterval() {
- return batchInterval;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index cb8dfb1..3da4d79 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -52,7 +52,6 @@ import java.util.concurrent.*;
public class StreamBuilder implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
- private final long startTimestamp;
private StreamParser streamParser = StringStreamParser.instance;
@@ -60,26 +59,49 @@ public class StreamBuilder implements Runnable {
private final MicroStreamBatchConsumer consumer;
- private final MicroBatchCondition condition;
-
private final String streaming;
- public StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+ private final long startTimestamp;
+
+ private final long batchInterval;
+
+ private final int batchSize;
+
+ public static final StreamBuilder newPeriodicalStreamBuilder(String streaming,
+ List<BlockingQueue<StreamMessage>> inputs,
+ MicroStreamBatchConsumer consumer,
+ long startTimestamp,
+ long batchInterval) {
+ return new StreamBuilder(streaming, inputs, consumer, startTimestamp, batchInterval);
+ }
+
+ public static final StreamBuilder newLimitedSizeStreamBuilder(String streaming,
+ BlockingQueue<StreamMessage> input,
+ MicroStreamBatchConsumer consumer,
+ long startTimestamp,
+ int batchSize) {
+ return new StreamBuilder(streaming, input, consumer, startTimestamp, batchSize);
+ }
+
+
+ private StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroStreamBatchConsumer consumer, long startTimestamp, long batchInterval) {
Preconditions.checkArgument(inputs.size() > 0);
this.streaming = streaming;
this.streamMessageQueues = Lists.newArrayList();
this.consumer = Preconditions.checkNotNull(consumer);
- this.condition = condition;
this.startTimestamp = startTimestamp;
+ this.batchInterval = batchInterval;
+ this.batchSize = -1;
init(inputs);
}
- public StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+ private StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroStreamBatchConsumer consumer, long startTimestamp, int batchSize) {
this.streaming = streaming;
this.streamMessageQueues = Lists.newArrayList();
this.consumer = Preconditions.checkNotNull(consumer);
- this.condition = condition;
this.startTimestamp = startTimestamp;
+ this.batchInterval = -1L;
+ this.batchSize = batchSize;
init(Preconditions.checkNotNull(input));
}
@@ -91,6 +113,14 @@ public class StreamBuilder implements Runnable {
this.streamMessageQueues.addAll(inputs);
}
+ private BatchCondition generateBatchCondition(long startTimestamp) {
+ if (batchInterval > 0) {
+ return new TimePeriodCondition(startTimestamp, startTimestamp + batchInterval);
+ } else {
+ return new LimitedSizeCondition(batchSize);
+ }
+ }
+
@Override
public void run() {
try {
@@ -106,7 +136,7 @@ public class StreamBuilder implements Runnable {
ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
int partitionId = 0;
for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
- futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
+ futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, generateBatchCondition(start))));
}
countDownLatch.await();
ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -125,9 +155,9 @@ public class StreamBuilder implements Runnable {
batch = MicroStreamBatch.union(batch, batches.get(i));
}
}
- batch.getTimestamp().setFirst(start);
- batch.getTimestamp().setSecond(start + condition.getBatchInterval());
- start += condition.getBatchInterval();
+ if (batchInterval > 0) {
+ start += batchInterval;
+ }
if (batch.size() == 0) {
logger.info("nothing to build, skip to next iteration after sleeping 10s");
@@ -163,15 +193,13 @@ public class StreamBuilder implements Runnable {
private final BlockingQueue<StreamMessage> streamMessageQueue;
private final CountDownLatch countDownLatch;
private final int partitionId;
- private long startTimestamp;
- private long endTimestamp;
+ private final BatchCondition condition;
- public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+ public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, BatchCondition condition) {
this.partitionId = partitionId;
this.streamMessageQueue = streamMessageQueue;
this.countDownLatch = countDownLatch;
- this.startTimestamp = startTimestamp;
- this.endTimestamp = endTimestamp;
+ this.condition = condition;
}
private void clearCounter() {
@@ -228,22 +256,18 @@ public class StreamBuilder implements Runnable {
throw new RuntimeException("parsedStreamMessage of " + new String(streamMessage.getRawData()) + " is null");
}
- final long timestamp = parsedStreamMessage.getTimestamp();
- if (timestamp < startTimestamp) {
- //TODO properly handle late megs
- streamMessageQueue.take();
- } else if (timestamp < endTimestamp) {
- streamMessageQueue.take();
- if (parsedStreamMessage.isAccepted()) {
+ final BatchCondition.Result result = condition.apply(parsedStreamMessage);
+ if (parsedStreamMessage.isAccepted()) {
+ if (result == BatchCondition.Result.ACCEPT) {
+ streamMessageQueue.take();
microStreamBatch.add(parsedStreamMessage);
- if (microStreamBatch.size() >= condition.getBatchSize()) {
- return microStreamBatch;
- }
- } else {
- //ignore pruned stream message
+ } else if (result == BatchCondition.Result.DISCARD) {
+ streamMessageQueue.take();
+ } else if (result == BatchCondition.Result.REJECT) {
+ return microStreamBatch;
}
} else {
- return microStreamBatch;
+ streamMessageQueue.take();
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
new file mode 100644
index 0000000..fdd35fc
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
@@ -0,0 +1,30 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class TimePeriodCondition implements BatchCondition {
+
+ private final long startTime;
+ private final long endTime;
+
+ public TimePeriodCondition(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+
+ }
+ @Override
+ public Result apply(ParsedStreamMessage message) {
+ if (message.getTimestamp() < startTime) {
+ return Result.DISCARD;
+ } else if (message.getTimestamp() < endTime) {
+ return Result.ACCEPT;
+ } else {
+ return Result.REJECT;
+ }
+ }
+
+ @Override
+ public BatchCondition copy() {
+ return this;
+ }
+}