You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/04 12:51:47 UTC

[incubator-iotdb] branch master updated: [IOTDB-576] Use SessionPool for Flink Connector instead of Session (#982)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e288af  [IOTDB-576] Use SessionPool for Flink Connector instead of Session (#982)
4e288af is described below

commit 4e288af514f2b6d802fab685538c06384a338d2a
Author: Xin Wang <xi...@apache.org>
AuthorDate: Sat Apr 4 20:51:39 2020 +0800

    [IOTDB-576] Use SessionPool for Flink Connector instead of Session (#982)
    
    * [IOTDB-576] Use SessionPool for Flink Connector instead of Session
---
 .../java/org/apache/iotdb/flink/IoTDBSink.java     | 32 +++++++++++++---------
 .../iotdb/flink/IoTDBSinkBatchInsertTest.java      | 22 +++++++--------
 .../iotdb/flink/IoTDBSinkBatchTimerTest.java       | 14 +++++-----
 .../apache/iotdb/flink/IoTDBSinkInsertTest.java    | 12 ++++----
 4 files changed, 43 insertions(+), 37 deletions(-)

diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index 601bef2..eebc3ce 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.flink;
 import com.google.common.base.Preconditions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,12 +44,13 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
     private IoTDBOptions options;
     private IoTSerializationSchema<IN> serializationSchema;
     private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
-    private transient Session session;
+    private transient SessionPool pool;
     private transient ScheduledExecutorService scheduledExecutor;
 
     private int batchSize = 0;
     private int flushIntervalMs = 3000;
     private List<Event> batchList;
+    private int sessionPoolSize = 2;
 
     public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
         this.options = options;
@@ -68,13 +69,12 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
     }
 
     void initSession() throws Exception {
-        session = new Session(options.getHost(), options.getPort(), options.getUser(), options.getPassword());
-        session.open();
+        pool = new SessionPool(options.getHost(), options.getPort(), options.getUser(), options.getPassword(), sessionPoolSize);
 
-        session.setStorageGroup(options.getStorageGroup());
+        pool.setStorageGroup(options.getStorageGroup());
         for (IoTDBOptions.TimeseriesOption option : options.getTimeseriesOptionList()) {
-            if (!session.checkTimeseriesExists(option.getPath())) {
-                session.createTimeseries(option.getPath(), option.getDataType(), option.getEncoding(), option.getCompressor());
+            if (!pool.checkTimeseriesExists(option.getPath())) {
+                pool.createTimeseries(option.getPath(), option.getDataType(), option.getEncoding(), option.getCompressor());
             }
         }
     }
@@ -93,8 +93,8 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
     }
 
     //  for testing
-    void setSession(Session session) {
-        this.session = session;
+    void setSessionPool(SessionPool pool) {
+        this.pool = pool;
     }
 
     @Override
@@ -115,7 +115,7 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
         }
 
         convertText(event.getDevice(), event.getMeasurements(), event.getValues());
-        session.insert(event.getDevice(), event.getTimestamp(), event.getMeasurements(),
+        pool.insert(event.getDevice(), event.getTimestamp(), event.getMeasurements(),
                 event.getValues());
         LOG.debug("send event successfully");
     }
@@ -132,15 +132,21 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
         return this;
     }
 
+    public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
+        Preconditions.checkArgument(sessionPoolSize > 0);
+        this.sessionPoolSize = sessionPoolSize;
+        return this;
+    }
+
     @Override
     public void close() throws Exception {
-        if (session != null) {
+        if (pool != null) {
             try {
                 flush();
             } catch (Exception e) {
                 LOG.error("flush error", e);
             }
-            session.close();
+            pool.close();
         }
         if (scheduledExecutor != null) {
             scheduledExecutor.shutdown();
@@ -176,7 +182,7 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
                         measurementsList.add(event.getMeasurements());
                         valuesList.add(event.getValues());
                     }
-                    session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+                    pool.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
                     LOG.debug("send event successfully");
                     batchList.clear();
                 }
diff --git a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
index 3692377..8a83cff 100644
--- a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
+++ b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.flink;
 
 import com.google.common.collect.Lists;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.pool.SessionPool;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.*;
 public class IoTDBSinkBatchInsertTest {
 
     private IoTDBSink ioTDBSink;
-    private Session session;
+    private SessionPool pool;
 
     @Before
     public void setUp() throws Exception {
@@ -42,8 +42,8 @@ public class IoTDBSinkBatchInsertTest {
         ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
         ioTDBSink.withBatchSize(3);
 
-        session = mock(Session.class);
-        ioTDBSink.setSession(session);
+        pool = mock(SessionPool.class);
+        ioTDBSink.setSessionPool(pool);
     }
 
     @Test
@@ -55,7 +55,7 @@ public class IoTDBSinkBatchInsertTest {
         tuple.put("values", "36.5");
         ioTDBSink.invoke(tuple, null);
 
-        verifyZeroInteractions(session);
+        verifyZeroInteractions(pool);
 
         tuple = new HashMap();
         tuple.put("device", "root.sg.D01");
@@ -64,7 +64,7 @@ public class IoTDBSinkBatchInsertTest {
         tuple.put("values", "37.2");
         ioTDBSink.invoke(tuple, null);
 
-        verifyZeroInteractions(session);
+        verifyZeroInteractions(pool);
 
         tuple = new HashMap();
         tuple.put("device", "root.sg.D01");
@@ -73,7 +73,7 @@ public class IoTDBSinkBatchInsertTest {
         tuple.put("values", "37.1");
         ioTDBSink.invoke(tuple, null);
 
-        verify(session).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
+        verify(pool).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
 
         tuple = new HashMap();
         tuple.put("device", "root.sg.D01");
@@ -82,7 +82,7 @@ public class IoTDBSinkBatchInsertTest {
         tuple.put("values", "36.5");
         ioTDBSink.invoke(tuple, null);
 
-        verifyZeroInteractions(session);
+        verifyZeroInteractions(pool);
     }
 
     @Test
@@ -93,10 +93,10 @@ public class IoTDBSinkBatchInsertTest {
         tuple.put("measurements", "temperature");
         tuple.put("values", "36.5");
         ioTDBSink.invoke(tuple, null);
-        verifyZeroInteractions(session);
+        verifyZeroInteractions(pool);
 
         ioTDBSink.close();
-        verify(session).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
-        verify(session).close();
+        verify(pool).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
+        verify(pool).close();
     }
 }
diff --git a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
index 226e798..c4112ae 100644
--- a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
+++ b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.flink;
 
 import com.google.common.collect.Lists;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.pool.SessionPool;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.*;
 public class IoTDBSinkBatchTimerTest {
 
     private IoTDBSink ioTDBSink;
-    private Session session;
+    private SessionPool pool;
 
     @Before
     public void setUp() throws Exception {
@@ -44,8 +44,8 @@ public class IoTDBSinkBatchTimerTest {
         ioTDBSink.withFlushIntervalMs(1000);
         ioTDBSink.initScheduler();
 
-        session = mock(Session.class);
-        ioTDBSink.setSession(session);
+        pool = mock(SessionPool.class);
+        ioTDBSink.setSessionPool(pool);
     }
 
     @Test
@@ -59,16 +59,16 @@ public class IoTDBSinkBatchTimerTest {
 
         Thread.sleep(2500);
 
-        verify(session).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
+        verify(pool).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
 
         Thread.sleep(1000);
 
-        verifyZeroInteractions(session);
+        verifyZeroInteractions(pool);
     }
 
     @Test
     public void close() throws Exception {
         ioTDBSink.close();
-        verify(session).close();
+        verify(pool).close();
     }
 }
diff --git a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
index 06fe7f3..07a38f7 100644
--- a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
+++ b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.flink;
 
 import com.google.common.collect.Lists;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.pool.SessionPool;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -34,7 +34,7 @@ import static org.mockito.Mockito.verify;
 public class IoTDBSinkInsertTest {
 
     private IoTDBSink ioTDBSink;
-    private Session session;
+    private SessionPool pool;
 
     @Before
     public void setUp() throws Exception {
@@ -42,8 +42,8 @@ public class IoTDBSinkInsertTest {
         options.setTimeseriesOptionList(Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
         ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
 
-        session = mock(Session.class);
-        ioTDBSink.setSession(session);
+        pool = mock(SessionPool.class);
+        ioTDBSink.setSessionPool(pool);
     }
 
     @Test
@@ -55,12 +55,12 @@ public class IoTDBSinkInsertTest {
         tuple.put("values", "36.5");
 
         ioTDBSink.invoke(tuple, null);
-        verify(session).insert(any(String.class), any(Long.class), any(List.class), any(List.class));
+        verify(pool).insert(any(String.class), any(Long.class), any(List.class), any(List.class));
     }
 
     @Test
     public void close() throws Exception {
         ioTDBSink.close();
-        verify(session).close();
+        verify(pool).close();
     }
 }