You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/12 10:55:56 UTC

[2/3] incubator-kylin git commit: KYLIN-826

KYLIN-826


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/db3aafcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/db3aafcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/db3aafcb

Branch: refs/heads/0.8
Commit: db3aafcb2233ccd55cc0f84606c7b5a92b4e8b69
Parents: c96a02c
Author: honma <ho...@ebay.com>
Authored: Fri Jun 12 15:39:34 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Jun 12 16:55:10 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java |  9 +++
 .../kylin/job/streaming/KafkaDataLoader.java    | 44 ++++++++----
 .../apache/kylin/job/BasicLocalMetaTest.java    | 24 +++++++
 .../kylin/job/BuildCubeWithStreamTest.java      | 64 +++++++++--------
 .../streaming/StreamingTableDataGenerator.java  | 73 ++++++++++++++++++++
 .../broadcaster/BroadcasterReceiveServlet.java  | 10 +--
 .../kylin/streaming/KafkaClusterConfig.java     |  4 ++
 .../kylin/streaming/StreamingManager.java       | 12 +++-
 .../kylin/streaming/OneOffStreamProducer.java   |  1 +
 9 files changed, 191 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 01a22c1..2f0590b 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.common.util;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.TreeMultiset;
 import org.apache.commons.configuration.ConfigurationException;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -62,6 +63,14 @@ public class BasicTest {
 
     @Test
     public void test0() throws Exception {
+        TreeMultiset<Long> xx = TreeMultiset.create();
+        xx.add(2L);
+        xx.add(1L);
+        xx.add(1L);
+        for(Long hi : xx)
+        {
+            System.out.println(hi);
+        }
         System.out.println(Long.MAX_VALUE);
 
         IdentityHashMap<String, Void> a = new IdentityHashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index eb47d15..da2c711 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -1,34 +1,44 @@
 package org.apache.kylin.job.streaming;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.streaming.BrokerConfig;
+import org.apache.kylin.streaming.KafkaClusterConfig;
 import org.apache.kylin.streaming.StreamingConfig;
 import org.apache.kylin.streaming.StreamingManager;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
 /**
+ * Load prepared data into kafka(for test use)
  */
 public class KafkaDataLoader {
-    /**
-     *
-     * @param args args[0] data file path, args[1] streaming name
-     * @throws IOException
-     */
-    public static void main(String[] args) throws IOException {
-        StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
-        StreamingConfig streamingConfig = streamingManager.getStreamingConfig(args[1]);
 
-        List<String> alldata = FileUtils.readLines(new File(args[0]));
+    public static void loadIntoKafka(String streamName, List<String> messages) {
+
+        StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
+        StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streamName);
 
+        KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
+        String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
+            @Nullable
+            @Override
+            public String apply(BrokerConfig brokerConfig) {
+                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
+            }
+        }), ",");
         Properties props = new Properties();
-        props.put("metadata.broker.list", "sandbox:6667");
+        props.put("metadata.broker.list", brokerList);
         props.put("serializer.class", "kafka.serializer.StringEncoder");
         props.put("request.required.acks", "1");
 
@@ -36,10 +46,20 @@ public class KafkaDataLoader {
 
         Producer<String, String> producer = new Producer<String, String>(config);
 
-        for (int i = 0; i < alldata.size(); ++i) {
-            KeyedMessage<String, String> data = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), alldata.get(i));
+        for (int i = 0; i < messages.size(); ++i) {
+            KeyedMessage<String, String> data = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
             producer.send(data);
         }
         producer.close();
     }
+
+    /**
+     *
+     * @param args args[0] data file path, args[1] streaming name
+     * @throws IOException
+     */
+    public static void main(String[] args) throws IOException {
+        List<String> alldata = FileUtils.readLines(new File(args[0]));
+        loadIntoKafka(args[1], alldata);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/job/src/test/java/org/apache/kylin/job/BasicLocalMetaTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BasicLocalMetaTest.java b/job/src/test/java/org/apache/kylin/job/BasicLocalMetaTest.java
new file mode 100644
index 0000000..d034a73
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/BasicLocalMetaTest.java
@@ -0,0 +1,24 @@
+package org.apache.kylin.job;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class BasicLocalMetaTest extends LocalFileMetadataTestCase {
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void basicTest() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index 9dca76a..ec94c56 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -34,18 +34,11 @@
 
 package org.apache.kylin.job;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hive.hcatalog.data.schema.HCatSchema;
@@ -67,25 +60,30 @@ import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.StreamingConfig;
+import org.apache.kylin.streaming.StreamingManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 /**
- *
- * This class is going to be deleted
+ *  for streaming cubing case "test_streaming_table"
  */
-@Ignore("For dev testing")
 public class BuildCubeWithStreamTest {
 
     private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamTest.class);
@@ -98,6 +96,7 @@ public class BuildCubeWithStreamTest {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+
     }
 
     @Before
@@ -108,6 +107,14 @@ public class BuildCubeWithStreamTest {
         kylinConfig = KylinConfig.getInstanceFromEnv();
         cubeManager = CubeManager.getInstance(kylinConfig);
 
+        //Use a random toplic for kafka data stream
+        StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig("test_streaming_table_table");
+        streamingConfig.setTopic(UUID.randomUUID().toString());
+        StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
+    }
+
+    private void loadKafkaData() {
+
     }
 
     @After
@@ -125,7 +132,7 @@ public class BuildCubeWithStreamTest {
         CubeSegment cubeSegment = cube.getSegment("19700101000000_20150401000000", SegmentStatusEnum.NEW);
         Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
 
-//
+        //
         for (DimensionDesc dim : desc.getDimensions()) {
             // dictionary
             for (TblColRef col : dim.getColumnRefs()) {
@@ -140,15 +147,14 @@ public class BuildCubeWithStreamTest {
             }
         }
 
-//        final String tableName = createIntermediateTable(desc, kylinConfig, null);
+        //        final String tableName = createIntermediateTable(desc, kylinConfig, null);
         String tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a24dec89_efbd_425f_9a5f_8b78dd1412af"; // has 3089 records;
-//        tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a5e1eb5d_da6b_475d_9807_be0b61f03215"; // only 20 rows;
-//        tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150302000000_0a183367_f245_43d1_8850_1c138c8514c3";
-//        tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150301000000_ce061464_7962_4642_bd7d_7c3d8fbe9389";
+        //        tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a5e1eb5d_da6b_475d_9807_be0b61f03215"; // only 20 rows;
+        //        tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150302000000_0a183367_f245_43d1_8850_1c138c8514c3";
+        //        tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150301000000_ce061464_7962_4642_bd7d_7c3d8fbe9389";
         tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150401000000_fb7ae579_d987_4900_a3b7_c60c731cd269"; // 2 million records
         logger.info("intermediate table name:" + tableName);
 
-
         ArrayBlockingQueue queue = new ArrayBlockingQueue<List<String>>(10000);
 
         InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new ConsoleGTRecordWriter());
@@ -166,7 +172,7 @@ public class BuildCubeWithStreamTest {
             row = reader.getRowAsList();
             queue.put(row);
             counter++;
-            if(counter == 200000)
+            if (counter == 200000)
                 break;
         }
         queue.put(new ArrayList<String>(0));
@@ -182,7 +188,6 @@ public class BuildCubeWithStreamTest {
         logger.info("stream build finished");
     }
 
-
     private void buildDictionary(List<List<String>> table, CubeDesc desc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
         SetMultimap<TblColRef, String> valueMap = HashMultimap.create();
 
@@ -216,7 +221,6 @@ public class BuildCubeWithStreamTest {
 
     }
 
-
     class ConsoleGTRecordWriter implements ICuboidWriter {
 
         boolean verbose = false;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
new file mode 100644
index 0000000..394b56f
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
@@ -0,0 +1,73 @@
+package org.apache.kylin.job.streaming;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.TreeMultiset;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * this is for generating fact table data for test_streaming_table
+ */
+public class StreamingTableDataGenerator {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingTableDataGenerator.class);
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    public static List<String> generate(int recordCount, long startTime, long endTime)  {
+        Preconditions.checkArgument(startTime < endTime);
+        Preconditions.checkArgument(recordCount > 0);
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc("streaming_table");
+
+        SortedMultiset<Long> times = TreeMultiset.create();
+        Random r = new Random();
+        for (int i = 0; i < recordCount; i++) {
+            long t = startTime + (long) ((endTime - startTime) * r.nextDouble());
+            times.add(t);
+        }
+
+        List<String> ret = Lists.newArrayList();
+        HashMap<String, String> kvs = Maps.newHashMap();
+        for (long time : times) {
+            kvs.clear();
+            kvs.put("timestamp", String.valueOf(time));
+            for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+                DataType dataType = columnDesc.getType();
+                if (dataType.isDateTimeFamily()) {
+                    continue;
+                } else if (dataType.isStringFamily()) {
+                    char c = (char) ('A' + (int) (26 * r.nextDouble()));
+                    kvs.put(columnDesc.getName(), String.valueOf(c));
+                } else if (dataType.isIntegerFamily()) {
+                    int v = r.nextInt(10000);
+                    kvs.put(columnDesc.getName(), String.valueOf(v));
+                } else if (dataType.isNumberFamily()) {
+                    String v = String.format("%.4f", r.nextDouble() * 100);
+                    kvs.put(columnDesc.getName(), v);
+                }
+            }
+            try {
+                ret.add(mapper.writeValueAsString(kvs));
+            } catch (JsonProcessingException e) {
+                logger.error("error!",e);
+            }
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/server/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java b/server/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
index ed63183..a68f67a 100644
--- a/server/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
+++ b/server/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
@@ -18,14 +18,13 @@
 
 package org.apache.kylin.rest.broadcaster;
 
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  */
@@ -33,7 +32,7 @@ public class BroadcasterReceiveServlet extends HttpServlet {
 
     private static final long serialVersionUID = 1L;
 
-    public static interface BroadcasterHandler {
+    public interface BroadcasterHandler {
 
         void handle(String type, String name, String event);
     }
@@ -45,6 +44,7 @@ public class BroadcasterReceiveServlet extends HttpServlet {
     }
 
     private static final Pattern PATTERN = Pattern.compile("/(.+)/(.+)/(.+)");
+
     @Override
     protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
         handle(req, resp);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
index ff34dd9..6a4b389 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
@@ -41,6 +41,10 @@ public class KafkaClusterConfig extends RootPersistentEntity {
         return streamingConfig.getMaxReadCount();
     }
 
+    public List<BrokerConfig> getBrokerConfigs() {
+        return brokerConfigs;
+    }
+
     public List<Broker> getBrokers() {
         return Lists.transform(brokerConfigs, new Function<BrokerConfig, Broker>() {
             @Nullable

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
index 3552edc..b4ecb0e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
@@ -72,7 +72,6 @@ public class StreamingManager {
         return ResourceStore.getStore(this.config);
     }
 
-
     public static StreamingManager getInstance(KylinConfig config) {
         StreamingManager r = CACHE.get(config);
         if (r != null) {
@@ -109,7 +108,6 @@ public class StreamingManager {
         return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
     }
 
-
     public boolean createOrUpdateKafkaConfig(String name, StreamingConfig config) {
         try {
             getStore().putResource(formatStreamingConfigPath(name), config, StreamingConfig.SERIALIZER);
@@ -129,6 +127,15 @@ public class StreamingManager {
         }
     }
 
+    public void saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+        if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
+            throw new IllegalArgumentException();
+        }
+
+        String path = formatStreamingConfigPath(streamingConfig.getName());
+        getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
+    }
+
     public long getOffset(String streaming, int shard) {
         final String resPath = formatStreamingOutputPath(streaming, shard);
         try {
@@ -189,5 +196,4 @@ public class StreamingManager {
     private final ObjectMapper mapper = new ObjectMapper();
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
index 93da99f..3cafe3f 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
@@ -53,6 +53,7 @@ import java.io.IOException;
 import java.util.Properties;
 
 /**
+ * for tests
  */
 public class OneOffStreamProducer {