You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/02 01:23:23 UTC
[incubator-inlong] branch master updated: [INLONG-2384][SDK] Sort-sdk support Tube consumer of PB compression cache message protocol (#2713)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new aa100b7 [INLONG-2384][SDK] Sort-sdk support Tube consumer of PB compression cache message protocol (#2713)
aa100b7 is described below
commit aa100b7b5cbe32f9f34947c1dcb8125e1af5f98d
Author: wardli <95...@users.noreply.github.com>
AuthorDate: Wed Mar 2 09:23:15 2022 +0800
[INLONG-2384][SDK] Sort-sdk support Tube consumer of PB compression cache message protocol (#2713)
---
.../sort/impl/pulsar/InLongPulsarFetcherImpl.java | 2 +-
.../sdk/sort/impl/tube/InLongTubeFetcherImpl.java | 55 ++++++++----
.../apache/inlong/sdk/sort/util/StringUtil.java | 11 +++
.../{ => pulsar}/InLongPulsarFetcherImplTest.java | 4 +-
.../sort/impl/tube/InLongTubeFetcherImplTest.java | 99 ++++++++++++++++++++++
5 files changed, 151 insertions(+), 20 deletions(-)
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
index 51a98e3..9c307cf 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
@@ -152,7 +152,7 @@ public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
.receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize())
.subscribe();
- String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+ String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date());
this.fetchThread = new Thread(new Fetcher(), threadName);
this.fetchThread.start();
} catch (Exception e) {
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
index 63abb3f..7c196f8 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
@@ -73,8 +73,7 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
messageConsumer.subscribe(inLongTopic.getTopic(), filters);
messageConsumer.completeSubscribe();
- String threadName = "sort_sdk_fetch_thread_" + StringUtil
- .formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+ String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date());
this.fetchThread = new Thread(new Fetcher(), threadName);
this.fetchThread.start();
} else {
@@ -90,23 +89,28 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
@Override
public void ack(String msgOffset) throws Exception {
if (!StringUtils.isEmpty(msgOffset)) {
- try {
- if (messageConsumer == null) {
- context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addAckFailTimes(1L);
- LOG.error("consumer == null");
- return;
- }
+ if (messageConsumer == null) {
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+ .addAckFailTimes(1L);
+ LOG.warn("consumer == null");
+ return;
+ }
+ try {
ConsumerResult consumerResult = messageConsumer.confirmConsume(msgOffset, true);
int errCode = consumerResult.getErrCode();
if (TErrCodeConstants.SUCCESS != errCode) {
- context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckFailTimes(1L);
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+ .addAckFailTimes(1L);
} else {
- context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckSuccTimes(1L);
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+ .addAckSuccTimes(1L);
}
} catch (Exception e) {
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
@@ -129,7 +133,6 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
@Override
public boolean close() {
- this.closed = true;
try {
if (fetchThread != null) {
fetchThread.interrupt();
@@ -139,6 +142,8 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
}
} catch (Throwable throwable) {
throwable.printStackTrace();
+ } finally {
+ this.closed = true;
}
LOG.info("closed {}", inLongTopic);
return true;
@@ -174,6 +179,15 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
return 0L;
}
+ /**
+ * isValidState
+ */
+ public void isValidState() {
+ if (closed) {
+ throw new IllegalStateException(inLongTopic + " closed.");
+ }
+ }
+
public class Fetcher implements Runnable {
/**
@@ -261,11 +275,14 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
if (null != message && TErrCodeConstants.SUCCESS == message.getErrCode()) {
List<InLongMessage> msgs = new ArrayList<>();
for (Message msg : message.getMessageList()) {
- msgs.add(new InLongMessage(msg.getData(), getAttributeMap(msg.getAttribute())));
+ List<InLongMessage> deserialize = deserializer
+ .deserialize(context, inLongTopic, getAttributeMap(msg.getAttribute()),
+ msg.getData());
+ msgs.addAll(deserialize);
context.getStatManager()
.getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addMsgCount(1L).addConsumeSize(msg.getData().length);
+ .addMsgCount(deserialize.size()).addConsumeSize(msg.getData().length);
}
handleAndCallbackMsg(new MessageRecord(inLongTopic.getTopicKey(), msgs,
@@ -294,6 +311,10 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
context.releaseRequestPermit();
}
}
+
+ if (closed) {
+ break;
+ }
}
}
}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
index 3ddaf54..ed0f3c8 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
@@ -95,4 +95,15 @@ public class StringUtil {
return sdf.format(date);
}
+ /**
+ * use default time foramt
+ *
+ * @param date {@link Date}
+ * @return String
+ */
+ public static String formatDate(Date date) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ return sdf.format(date);
+ }
+
}
diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
similarity index 98%
rename from inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java
rename to inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
index e0a650f..e8365d3 100644
--- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java
+++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.sort.impl;
+package org.apache.inlong.sdk.sort.impl.pulsar;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -30,7 +30,7 @@ import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
import org.apache.inlong.sdk.sort.api.SortClientConfig;
import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl;
+import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
import org.apache.inlong.sdk.sort.stat.StatManager;
import org.apache.pulsar.client.api.Consumer;
diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
new file mode 100644
index 0000000..9ef2762
--- /dev/null
+++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.tube;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
+import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
+import org.apache.inlong.sdk.sort.stat.StatManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+public class InLongTubeFetcherImplTest {
+
+ private ClientContext clientContext;
+ private InLongTopic inLongTopic;
+ private SortClientConfig sortClientConfig;
+ private StatManager statManager;
+
+ /**
+ * setUp
+ */
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
+
+ inLongTopic = new InLongTopic();
+ inLongTopic.setTopic("testTopic");
+ inLongTopic.setPartitionId(0);
+ inLongTopic.setTopicType("pulsar");
+
+ CacheZoneCluster cacheZoneCluster = new CacheZoneCluster("clusterId", "bootstraps", "token");
+ inLongTopic.setInLongCluster(cacheZoneCluster);
+ clientContext = PowerMockito.mock(ClientContextImpl.class);
+
+ sortClientConfig = PowerMockito.mock(SortClientConfig.class);
+ statManager = PowerMockito.mock(StatManager.class);
+
+ when(clientContext.getConfig()).thenReturn(sortClientConfig);
+ when(clientContext.getStatManager()).thenReturn(statManager);
+ SortClientStateCounter sortClientStateCounter = new SortClientStateCounter("sortTaskId",
+ cacheZoneCluster.getClusterId(),
+ inLongTopic.getTopic(), 0);
+ when(statManager.getStatistics(anyString(), anyString(), anyString())).thenReturn(sortClientStateCounter);
+ when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
+
+ }
+
+ @Test
+ public void isValidState() {
+ InLongTubeFetcherImpl inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext);
+ inLongTopicFetcher.isValidState();
+ }
+
+ @Test
+ public void pause() {
+ InLongTubeFetcherImpl inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext);
+ inLongTopicFetcher.pause();
+ }
+
+ @Test
+ public void resume() {
+ InLongTubeFetcherImpl inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext);
+ inLongTopicFetcher.resume();
+ }
+
+ @Test
+ public void close() {
+ InLongTopicFetcher inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext);
+ boolean close = inLongTopicFetcher.close();
+ Assert.assertTrue(close);
+
+ boolean closed = inLongTopicFetcher.isClosed();
+ Assert.assertTrue(closed);
+ }
+}
\ No newline at end of file