You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/04 12:51:47 UTC
[incubator-iotdb] branch master updated: [IOTDB-576] Use
SessionPool for Flink Connector instead of Session (#982)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4e288af [IOTDB-576] Use SessionPool for Flink Connector instead of Session (#982)
4e288af is described below
commit 4e288af514f2b6d802fab685538c06384a338d2a
Author: Xin Wang <xi...@apache.org>
AuthorDate: Sat Apr 4 20:51:39 2020 +0800
[IOTDB-576] Use SessionPool for Flink Connector instead of Session (#982)
* [IOTDB-576] Use SessionPool for Flink Connector instead of Session
---
.../java/org/apache/iotdb/flink/IoTDBSink.java | 32 +++++++++++++---------
.../iotdb/flink/IoTDBSinkBatchInsertTest.java | 22 +++++++--------
.../iotdb/flink/IoTDBSinkBatchTimerTest.java | 14 +++++-----
.../apache/iotdb/flink/IoTDBSinkInsertTest.java | 12 ++++----
4 files changed, 43 insertions(+), 37 deletions(-)
diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index 601bef2..eebc3ce 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.flink;
import com.google.common.base.Preconditions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,12 +44,13 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
private IoTDBOptions options;
private IoTSerializationSchema<IN> serializationSchema;
private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
- private transient Session session;
+ private transient SessionPool pool;
private transient ScheduledExecutorService scheduledExecutor;
private int batchSize = 0;
private int flushIntervalMs = 3000;
private List<Event> batchList;
+ private int sessionPoolSize = 2;
public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
this.options = options;
@@ -68,13 +69,12 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
}
void initSession() throws Exception {
- session = new Session(options.getHost(), options.getPort(), options.getUser(), options.getPassword());
- session.open();
+ pool = new SessionPool(options.getHost(), options.getPort(), options.getUser(), options.getPassword(), sessionPoolSize);
- session.setStorageGroup(options.getStorageGroup());
+ pool.setStorageGroup(options.getStorageGroup());
for (IoTDBOptions.TimeseriesOption option : options.getTimeseriesOptionList()) {
- if (!session.checkTimeseriesExists(option.getPath())) {
- session.createTimeseries(option.getPath(), option.getDataType(), option.getEncoding(), option.getCompressor());
+ if (!pool.checkTimeseriesExists(option.getPath())) {
+ pool.createTimeseries(option.getPath(), option.getDataType(), option.getEncoding(), option.getCompressor());
}
}
}
@@ -93,8 +93,8 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
}
// for testing
- void setSession(Session session) {
- this.session = session;
+ void setSessionPool(SessionPool pool) {
+ this.pool = pool;
}
@Override
@@ -115,7 +115,7 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
}
convertText(event.getDevice(), event.getMeasurements(), event.getValues());
- session.insert(event.getDevice(), event.getTimestamp(), event.getMeasurements(),
+ pool.insert(event.getDevice(), event.getTimestamp(), event.getMeasurements(),
event.getValues());
LOG.debug("send event successfully");
}
@@ -132,15 +132,21 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
return this;
}
+ public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
+ Preconditions.checkArgument(sessionPoolSize > 0);
+ this.sessionPoolSize = sessionPoolSize;
+ return this;
+ }
+
@Override
public void close() throws Exception {
- if (session != null) {
+ if (pool != null) {
try {
flush();
} catch (Exception e) {
LOG.error("flush error", e);
}
- session.close();
+ pool.close();
}
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
@@ -176,7 +182,7 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
measurementsList.add(event.getMeasurements());
valuesList.add(event.getValues());
}
- session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+ pool.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
LOG.debug("send event successfully");
batchList.clear();
}
diff --git a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
index 3692377..8a83cff 100644
--- a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
+++ b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.flink;
import com.google.common.collect.Lists;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.pool.SessionPool;
import org.junit.Before;
import org.junit.Test;
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.*;
public class IoTDBSinkBatchInsertTest {
private IoTDBSink ioTDBSink;
- private Session session;
+ private SessionPool pool;
@Before
public void setUp() throws Exception {
@@ -42,8 +42,8 @@ public class IoTDBSinkBatchInsertTest {
ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
ioTDBSink.withBatchSize(3);
- session = mock(Session.class);
- ioTDBSink.setSession(session);
+ pool = mock(SessionPool.class);
+ ioTDBSink.setSessionPool(pool);
}
@Test
@@ -55,7 +55,7 @@ public class IoTDBSinkBatchInsertTest {
tuple.put("values", "36.5");
ioTDBSink.invoke(tuple, null);
- verifyZeroInteractions(session);
+ verifyZeroInteractions(pool);
tuple = new HashMap();
tuple.put("device", "root.sg.D01");
@@ -64,7 +64,7 @@ public class IoTDBSinkBatchInsertTest {
tuple.put("values", "37.2");
ioTDBSink.invoke(tuple, null);
- verifyZeroInteractions(session);
+ verifyZeroInteractions(pool);
tuple = new HashMap();
tuple.put("device", "root.sg.D01");
@@ -73,7 +73,7 @@ public class IoTDBSinkBatchInsertTest {
tuple.put("values", "37.1");
ioTDBSink.invoke(tuple, null);
- verify(session).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
+ verify(pool).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
tuple = new HashMap();
tuple.put("device", "root.sg.D01");
@@ -82,7 +82,7 @@ public class IoTDBSinkBatchInsertTest {
tuple.put("values", "36.5");
ioTDBSink.invoke(tuple, null);
- verifyZeroInteractions(session);
+ verifyZeroInteractions(pool);
}
@Test
@@ -93,10 +93,10 @@ public class IoTDBSinkBatchInsertTest {
tuple.put("measurements", "temperature");
tuple.put("values", "36.5");
ioTDBSink.invoke(tuple, null);
- verifyZeroInteractions(session);
+ verifyZeroInteractions(pool);
ioTDBSink.close();
- verify(session).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
- verify(session).close();
+ verify(pool).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
+ verify(pool).close();
}
}
diff --git a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
index 226e798..c4112ae 100644
--- a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
+++ b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.flink;
import com.google.common.collect.Lists;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.pool.SessionPool;
import org.junit.Before;
import org.junit.Test;
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.*;
public class IoTDBSinkBatchTimerTest {
private IoTDBSink ioTDBSink;
- private Session session;
+ private SessionPool pool;
@Before
public void setUp() throws Exception {
@@ -44,8 +44,8 @@ public class IoTDBSinkBatchTimerTest {
ioTDBSink.withFlushIntervalMs(1000);
ioTDBSink.initScheduler();
- session = mock(Session.class);
- ioTDBSink.setSession(session);
+ pool = mock(SessionPool.class);
+ ioTDBSink.setSessionPool(pool);
}
@Test
@@ -59,16 +59,16 @@ public class IoTDBSinkBatchTimerTest {
Thread.sleep(2500);
- verify(session).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
+ verify(pool).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
Thread.sleep(1000);
- verifyZeroInteractions(session);
+ verifyZeroInteractions(pool);
}
@Test
public void close() throws Exception {
ioTDBSink.close();
- verify(session).close();
+ verify(pool).close();
}
}
diff --git a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
index 06fe7f3..07a38f7 100644
--- a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
+++ b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.flink;
import com.google.common.collect.Lists;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.pool.SessionPool;
import org.junit.Before;
import org.junit.Test;
@@ -34,7 +34,7 @@ import static org.mockito.Mockito.verify;
public class IoTDBSinkInsertTest {
private IoTDBSink ioTDBSink;
- private Session session;
+ private SessionPool pool;
@Before
public void setUp() throws Exception {
@@ -42,8 +42,8 @@ public class IoTDBSinkInsertTest {
options.setTimeseriesOptionList(Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
- session = mock(Session.class);
- ioTDBSink.setSession(session);
+ pool = mock(SessionPool.class);
+ ioTDBSink.setSessionPool(pool);
}
@Test
@@ -55,12 +55,12 @@ public class IoTDBSinkInsertTest {
tuple.put("values", "36.5");
ioTDBSink.invoke(tuple, null);
- verify(session).insert(any(String.class), any(Long.class), any(List.class), any(List.class));
+ verify(pool).insert(any(String.class), any(Long.class), any(List.class), any(List.class));
}
@Test
public void close() throws Exception {
ioTDBSink.close();
- verify(session).close();
+ verify(pool).close();
}
}