You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/02 09:14:20 UTC

[inlong] branch release-1.3.0 updated (2cbb2c796 -> 8149441fb)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 2cbb2c796 [INLONG-5761][Dashboard] Add agent type to cluster management (#5764)
     new ad81c2d2f [INLONG-5762][Sort] Fix the computing for the Pulsar source metric (#5763)
     new 8149441fb [INLONG-5767][Sort] Fix IOUtils interface is incompatible in JDK 17 (#5768)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/DynamicPulsarDeserializationSchema.java  | 79 ++++++++++++++++++----
 .../pulsar/withoutadmin/CallbackCollector.java     | 31 +++++----
 .../sort/tests/utils/FlinkContainerTestEnv.java    |  8 ++-
 3 files changed, 88 insertions(+), 30 deletions(-)
 copy inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/JarHellTest.java => inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java (59%)


[inlong] 02/02: [INLONG-5767][Sort] Fix IOUtils interface is incompatible in JDK 17 (#5768)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 8149441fb74e18afda89e18b60301df8899f66dd
Author: xuesongxs <54...@users.noreply.github.com>
AuthorDate: Fri Sep 2 15:20:26 2022 +0800

    [INLONG-5767][Sort] Fix IOUtils interface is incompatible in JDK 17 (#5768)
---
 .../org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index 914ac7285..bfe3bc6b1 100644
--- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -18,6 +18,7 @@
 
 package org.apache.inlong.sort.tests.utils;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.deployment.StandaloneClusterId;
@@ -42,7 +43,6 @@ import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.images.builder.Transferable;
 import org.testcontainers.lifecycle.Startables;
-import sun.misc.IOUtils;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -283,7 +283,7 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
             jarFile.stream().forEach(entry -> {
                 try (InputStream is = jarFile.getInputStream(entry)) {
                     jos.putNextEntry(entry);
-                    jos.write(IOUtils.readNBytes(is, is.available()));
+                    jos.write(IOUtils.toByteArray(is));
                     jos.closeEntry();
                 } catch (IOException e) {
                     throw new RuntimeException(e);
@@ -293,8 +293,10 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
             for (Path jar : jars) {
                 try (InputStream is = new FileInputStream(jar.toFile())) {
                     jos.putNextEntry(new JarEntry("lib/" + jar.getFileName().toString()));
-                    jos.write(IOUtils.readNBytes(is, is.available()));
+                    jos.write(IOUtils.toByteArray(is));
                     jos.closeEntry();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
                 }
             }
 


[inlong] 01/02: [INLONG-5762][Sort] Fix the computing for the Pulsar source metric (#5763)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ad81c2d2f823051b914022df71722a059df3ac4d
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Fri Sep 2 14:14:03 2022 +0800

    [INLONG-5762][Sort] Fix the computing for the Pulsar source metric (#5763)
---
 .../table/DynamicPulsarDeserializationSchema.java  | 79 ++++++++++++++++++----
 .../pulsar/withoutadmin/CallbackCollector.java     | 47 +++++++++++++
 2 files changed, 111 insertions(+), 15 deletions(-)

diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 38a97bf5c..7522a2e43 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,10 +18,11 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
-import java.util.Arrays;
-import java.util.HashSet;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
 import org.apache.flink.streaming.util.serialization.FlinkSchema;
 import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
@@ -35,14 +36,21 @@ import org.apache.flink.util.Preconditions;
 import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
@@ -118,7 +126,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             inlongGroupId = inlongMetricArray[0];
             inlongStreamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup());
+            sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, getMetricGroup(context));
             sourceMetricData.registerMetricsForNumBytesIn();
             sourceMetricData.registerMetricsForNumBytesInPerSecond();
             sourceMetricData.registerMetricsForNumRecordsIn();
@@ -132,6 +140,40 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
 
     }
 
+    /**
+     * reflect get metricGroup
+     *
+     * @param context Contextual information that can be used during initialization.
+     * @return metric group that can be used to register new metrics with Flink and to create a nested hierarchy based
+     *         on the group names.
+     */
+    private MetricGroup getMetricGroup(DeserializationSchema.InitializationContext context)
+            throws NoSuchFieldException, IllegalAccessException {
+        MetricGroup metricGroup;
+        String className = "RuntimeContextDeserializationInitializationContextAdapter";
+        String fieldName = "runtimeContext";
+        Class runtimeContextDeserializationInitializationContextAdapter = null;
+        Class[] innerClazz = RuntimeContextInitializationContextAdapters.class.getDeclaredClasses();
+        for (Class clazz : innerClazz) {
+            int mod = clazz.getModifiers();
+            if (Modifier.isPrivate(mod)) {
+                if (className.equalsIgnoreCase(clazz.getSimpleName())) {
+                    runtimeContextDeserializationInitializationContextAdapter = clazz;
+                    break;
+                }
+            }
+        }
+        if (runtimeContextDeserializationInitializationContextAdapter != null) {
+            Field field = runtimeContextDeserializationInitializationContextAdapter.getDeclaredField(fieldName);
+            field.setAccessible(true);
+            RuntimeContext runtimeContext = (RuntimeContext) field.get(context);
+            metricGroup = runtimeContext.getMetricGroup();
+        } else {
+            metricGroup = context.getMetricGroup();
+        }
+        return metricGroup;
+    }
+
     @Override
     public boolean isEndOfStream(RowData nextElement) {
         return false;
@@ -146,11 +188,15 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
 
     @Override
     public void deserialize(Message<RowData> message, Collector<RowData> collector) throws IOException {
+        AtomicLong counter = new AtomicLong();
         // shortcut in case no output projection is required,
         // also not for a cartesian product with the keys
         if (keyDeserialization == null && !hasMetadata) {
-            valueDeserialization.deserialize(message.getData(), collector);
-            outputMetrics(message);
+            valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
+                counter.addAndGet(1L);
+                collector.collect(inputRow);
+            }));
+            outputMetrics(counter, message);
             return;
         }
         BufferingCollector keyCollector = new BufferingCollector();
@@ -168,27 +214,30 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             // collect tombstone messages in upsert mode by hand
             outputCollector.collect(null);
         } else {
-            valueDeserialization.deserialize(message.getData(), outputCollector);
-            outputMetrics(message);
+            valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
+                counter.addAndGet(1L);
+                outputCollector.collect(inputRow);
+            }));
+            outputMetrics(counter, message);
         }
 
         keyCollector.buffer.clear();
     }
 
-    private void outputMetrics(Message<RowData> message) {
+    private void outputMetrics(AtomicLong counter, Message<RowData> message) {
         if (sourceMetricData != null) {
-            sourceMetricData.getNumRecordsIn().inc(1L);
+            sourceMetricData.getNumRecordsIn().inc(counter.get());
             sourceMetricData.getNumBytesIn()
                     .inc(message.getData().length);
         }
         if (auditImp != null) {
             auditImp.add(
-                Constants.AUDIT_SORT_INPUT,
-                inlongGroupId,
-                inlongStreamId,
-                System.currentTimeMillis(),
-                1,
-                message.getData().length);
+                    Constants.AUDIT_SORT_INPUT,
+                    inlongGroupId,
+                    inlongStreamId,
+                    System.currentTimeMillis(),
+                    counter.get(),
+                    message.getData().length);
         }
     }
 
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
new file mode 100644
index 000000000..e61b7c3f7
--- /dev/null
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.withoutadmin;
+
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * A collector supporting callback.
+ */
+public class CallbackCollector<T> implements Collector<T> {
+
+    private final ThrowingConsumer<T, Exception> callback;
+
+    public CallbackCollector(ThrowingConsumer<T, Exception> callback) {
+        this.callback = callback;
+    }
+
+    @Override
+    public void collect(T t) {
+        try {
+            callback.accept(t);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+
+    }
+}