You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/02 01:23:23 UTC

[incubator-inlong] branch master updated: [INLONG-2384][SDK] Sort-sdk support Tube consumer of PB compression cache message protocol (#2713)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new aa100b7  [INLONG-2384][SDK] Sort-sdk support Tube consumer of PB compression cache message protocol (#2713)
aa100b7 is described below

commit aa100b7b5cbe32f9f34947c1dcb8125e1af5f98d
Author: wardli <95...@users.noreply.github.com>
AuthorDate: Wed Mar 2 09:23:15 2022 +0800

    [INLONG-2384][SDK] Sort-sdk support Tube consumer of PB compression cache message protocol (#2713)
---
 .../sort/impl/pulsar/InLongPulsarFetcherImpl.java  |  2 +-
 .../sdk/sort/impl/tube/InLongTubeFetcherImpl.java  | 55 ++++++++----
 .../apache/inlong/sdk/sort/util/StringUtil.java    | 11 +++
 .../{ => pulsar}/InLongPulsarFetcherImplTest.java  |  4 +-
 .../sort/impl/tube/InLongTubeFetcherImplTest.java  | 99 ++++++++++++++++++++++
 5 files changed, 151 insertions(+), 20 deletions(-)

diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
index 51a98e3..9c307cf 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
@@ -152,7 +152,7 @@ public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
                     .receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize())
                     .subscribe();
 
-            String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+            String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date());
             this.fetchThread = new Thread(new Fetcher(), threadName);
             this.fetchThread.start();
         } catch (Exception e) {
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
index 63abb3f..7c196f8 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
@@ -73,8 +73,7 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
                 messageConsumer.subscribe(inLongTopic.getTopic(), filters);
                 messageConsumer.completeSubscribe();
 
-                String threadName = "sort_sdk_fetch_thread_" + StringUtil
-                        .formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+                String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date());
                 this.fetchThread = new Thread(new Fetcher(), threadName);
                 this.fetchThread.start();
             } else {
@@ -90,23 +89,28 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
     @Override
     public void ack(String msgOffset) throws Exception {
         if (!StringUtils.isEmpty(msgOffset)) {
-            try {
-                if (messageConsumer == null) {
-                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addAckFailTimes(1L);
-                    LOG.error("consumer == null");
-                    return;
-                }
+            if (messageConsumer == null) {
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                        .addAckFailTimes(1L);
+                LOG.warn("consumer == null");
+                return;
+            }
 
+            try {
                 ConsumerResult consumerResult = messageConsumer.confirmConsume(msgOffset, true);
                 int errCode = consumerResult.getErrCode();
                 if (TErrCodeConstants.SUCCESS != errCode) {
-                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckFailTimes(1L);
+                    context.getStatManager()
+                            .getStatistics(context.getConfig().getSortTaskId(),
+                                    inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addAckFailTimes(1L);
                 } else {
-                    context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                            inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckSuccTimes(1L);
+                    context.getStatManager()
+                            .getStatistics(context.getConfig().getSortTaskId(),
+                                    inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addAckSuccTimes(1L);
                 }
             } catch (Exception e) {
                 context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
@@ -129,7 +133,6 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
 
     @Override
     public boolean close() {
-        this.closed = true;
         try {
             if (fetchThread != null) {
                 fetchThread.interrupt();
@@ -139,6 +142,8 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
             }
         } catch (Throwable throwable) {
             throwable.printStackTrace();
+        } finally {
+            this.closed = true;
         }
         LOG.info("closed {}", inLongTopic);
         return true;
@@ -174,6 +179,15 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
         return 0L;
     }
 
+    /**
+     * isValidState
+     */
+    public void isValidState() {
+        if (closed) {
+            throw new IllegalStateException(inLongTopic + " closed.");
+        }
+    }
+
     public class Fetcher implements Runnable {
 
         /**
@@ -261,11 +275,14 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
                     if (null != message && TErrCodeConstants.SUCCESS == message.getErrCode()) {
                         List<InLongMessage> msgs = new ArrayList<>();
                         for (Message msg : message.getMessageList()) {
-                            msgs.add(new InLongMessage(msg.getData(), getAttributeMap(msg.getAttribute())));
+                            List<InLongMessage> deserialize = deserializer
+                                    .deserialize(context, inLongTopic, getAttributeMap(msg.getAttribute()),
+                                            msg.getData());
+                            msgs.addAll(deserialize);
                             context.getStatManager()
                                     .getStatistics(context.getConfig().getSortTaskId(),
                                             inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                                    .addMsgCount(1L).addConsumeSize(msg.getData().length);
+                                    .addMsgCount(deserialize.size()).addConsumeSize(msg.getData().length);
                         }
 
                         handleAndCallbackMsg(new MessageRecord(inLongTopic.getTopicKey(), msgs,
@@ -294,6 +311,10 @@ public class InLongTubeFetcherImpl extends InLongTopicFetcher {
                         context.releaseRequestPermit();
                     }
                 }
+
+                if (closed) {
+                    break;
+                }
             }
         }
     }
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
index 3ddaf54..ed0f3c8 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
@@ -95,4 +95,15 @@ public class StringUtil {
         return sdf.format(date);
     }
 
+    /**
+     * use default time foramt
+     *
+     * @param date {@link Date}
+     * @return String
+     */
+    public static String formatDate(Date date) {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        return sdf.format(date);
+    }
+
 }
diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
similarity index 98%
rename from inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java
rename to inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
index e0a650f..e8365d3 100644
--- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java
+++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.sort.impl;
+package org.apache.inlong.sdk.sort.impl.pulsar;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -30,7 +30,7 @@ import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
 import org.apache.inlong.sdk.sort.api.SortClientConfig;
 import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl;
+import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
 import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
 import org.apache.inlong.sdk.sort.stat.StatManager;
 import org.apache.pulsar.client.api.Consumer;
diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
new file mode 100644
index 0000000..9ef2762
--- /dev/null
+++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.tube;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
+import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
+import org.apache.inlong.sdk.sort.stat.StatManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+public class InLongTubeFetcherImplTest {
+
+    private ClientContext clientContext;
+    private InLongTopic inLongTopic;
+    private SortClientConfig sortClientConfig;
+    private StatManager statManager;
+
+    /**
+     * setUp
+     */
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
+
+        inLongTopic = new InLongTopic();
+        inLongTopic.setTopic("testTopic");
+        inLongTopic.setPartitionId(0);
+        inLongTopic.setTopicType("pulsar");
+
+        CacheZoneCluster cacheZoneCluster = new CacheZoneCluster("clusterId", "bootstraps", "token");
+        inLongTopic.setInLongCluster(cacheZoneCluster);
+        clientContext = PowerMockito.mock(ClientContextImpl.class);
+
+        sortClientConfig = PowerMockito.mock(SortClientConfig.class);
+        statManager = PowerMockito.mock(StatManager.class);
+
+        when(clientContext.getConfig()).thenReturn(sortClientConfig);
+        when(clientContext.getStatManager()).thenReturn(statManager);
+        SortClientStateCounter sortClientStateCounter = new SortClientStateCounter("sortTaskId",
+                cacheZoneCluster.getClusterId(),
+                inLongTopic.getTopic(), 0);
+        when(statManager.getStatistics(anyString(), anyString(), anyString())).thenReturn(sortClientStateCounter);
+        when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
+
+    }
+
+    @Test
+    public void isValidState() {
+        InLongTubeFetcherImpl inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext);
+        inLongTopicFetcher.isValidState();
+    }
+
+    @Test
+    public void pause() {
+        InLongTubeFetcherImpl inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext);
+        inLongTopicFetcher.pause();
+    }
+
+    @Test
+    public void resume() {
+        InLongTubeFetcherImpl inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext);
+        inLongTopicFetcher.resume();
+    }
+
+    @Test
+    public void close() {
+        InLongTopicFetcher inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext);
+        boolean close = inLongTopicFetcher.close();
+        Assert.assertTrue(close);
+
+        boolean closed = inLongTopicFetcher.isClosed();
+        Assert.assertTrue(closed);
+    }
+}
\ No newline at end of file