You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/29 09:11:29 UTC

[GitHub] [pulsar] lhotari commented on a diff in pull request #15264: [improve][build] PIP-156 Build Pulsar Server on Java 17

lhotari commented on code in PR #15264:
URL: https://github.com/apache/pulsar/pull/15264#discussion_r861616726


##########
README.md:
##########
@@ -83,10 +83,27 @@ components in the Pulsar ecosystem, including connectors, adapters, and other la
 
 - [Pulsar CI](https://github.com/apache/pulsar-test-infra)
 
+## Pulsar Runtime Java Version Recommendation
+
+pulsar ver 2.11  >=
+
+| Pulsar Components | Java Version  |
+| ----------------- |:-------------:|
+| Broker            | 17            |
+| CLI               | 17            |
+| Java Client       | 8 or 11 or 17 |
+
+pulsar ver 2.11 <
+
+| Pulsar Components | Java Version  |
+| ----------------- |:-------------:|
+| All               | 8 or 11       |

Review Comment:
   For Broker, the recommended runtime environment has been Java 11 since Pulsar 2.8.0 release.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java:
##########
@@ -34,7 +34,7 @@
  * ManagedLedgerInfo compression configuration test.
  */
 
-@Test(groups = {"broker", "broker-jdk8"})
+@Test(groups = {"broker", "broker-jdk17"})

Review Comment:
   just remove it. The reason why this specific test was marked for broker-jdk8 was that there's a change between Java 8 and Java 11 that showed up in this test. It's now obsolete, so remove the broker-jdk8 group completely.



##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java:
##########
@@ -189,38 +194,56 @@ private void internalValidateSinkResult(Map<String, String> kvs) {
 
         Map<String, String> actualKvs = new LinkedHashMap<>();
 
-        // millisBehindLatest equals zero when record processing is caught up,
-        // and there are no new records to process at this moment.
-        // See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
-        Awaitility.await().until(() -> addMoreRecordsAndGetMillisBehindLatest(actualKvs, iterator) == 0);
+        addMoreRecords(actualKvs, iterator);
 
         assertEquals(actualKvs, kvs);
     }
 
     @SneakyThrows
-    private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String> kvs, String iterator) {
-        final GetRecordsResponse response = client.getRecords(
-                GetRecordsRequest
-                        .builder()
-                        .shardIterator(iterator)
-                        .build())
-                .get();
-        if(response.hasRecords()) {
-            for (Record record : response.records()) {
-                String data = record.data().asString(StandardCharsets.UTF_8);
-                if (withSchema) {
-                    JsonNode payload = READER.readTree(data).at("/payload");
-                    String i = payload.at("/value/field1").asText();
-                    assertEquals(payload.at("/value/field2").asText(), "v2_" + i);
-                    assertEquals(payload.at("/key/field1").asText(), "f1_" + i);
-                    assertEquals(payload.at("/key/field2").asText(), "f2_" + i);
-                    kvs.put(i, i);
-                } else {
-                    kvs.put(record.partitionKey(), data);
+    private void parseRecordData(Map<String, String> actualKvs, String data, String partitionKey) {
+        if (withSchema) {
+            JsonNode payload = READER.readTree(data).at("/payload");
+            String i = payload.at("/value/field1").asText();
+            assertEquals(payload.at("/value/field2").asText(), "v2_" + i);
+            assertEquals(payload.at("/key/field1").asText(), "f1_" + i);
+            assertEquals(payload.at("/key/field2").asText(), "f2_" + i);
+            actualKvs.put(i, i);
+        } else {
+            actualKvs.put(partitionKey, data);
+        }
+    }
+
+    @SneakyThrows
+    private void addMoreRecords(Map<String, String> actualKvs, String iterator) {
+        GetRecordsResponse response;
+        List<KinesisClientRecord> aggRecords = new ArrayList<>();
+        do {
+            GetRecordsRequest request = GetRecordsRequest.builder().shardIterator(iterator).build();
+            response = client.getRecords(request).get();
+            if (response.hasRecords()) {
+                for (Record record : response.records()) {
+                    // KinesisSink uses KPL with aggregation enabled (by default).
+                    // However, due to the async state initialization of the KPL internal ShardMap,
+                    // the first sinked records might not be aggregated in Kinesis.
+                    // ref: https://github.com/awslabs/amazon-kinesis-producer/issues/131
+                    try {
+                        String data = record.data().asString(StandardCharsets.UTF_8);
+                        parseRecordData(actualKvs, data, record.partitionKey());
+                    } catch (UncheckedIOException e) {
+                        aggRecords.add(KinesisClientRecord.fromRecord(record));
+                    }
                 }
             }
+            iterator = response.nextShardIterator();
+            // millisBehindLatest equals zero when record processing is caught up,
+            // and there are no new records to process at this moment.
+            // See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
+        } while (response.millisBehindLatest() != 0);
+
+        for (KinesisClientRecord record : new AggregatorUtil().deaggregate(aggRecords)) {
+            String data = new String(record.data().array(), StandardCharsets.UTF_8);
+            parseRecordData(actualKvs, data, record.partitionKey());

Review Comment:
   Please remove this from this PR. Submit as a separate PR.



##########
build/run_unit_group.sh:
##########
@@ -78,8 +78,8 @@ function test_group_broker_client_impl() {
   mvn_test -pl pulsar-broker -Dgroups='broker-impl'
 }
 
-function test_group_broker_jdk8() {
-  mvn_test -pl pulsar-broker -Dgroups='broker-jdk8' -Dpulsar.allocator.pooled=true
+function test_group_broker_jdk17() {

Review Comment:
   > but it might be useful in the future when we start supporting jdk18+
   
   YAGNI. I think it's better to not go so far ahead. We can add that solution whenever that happens. 



##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java:
##########
@@ -189,38 +194,56 @@ private void internalValidateSinkResult(Map<String, String> kvs) {
 
         Map<String, String> actualKvs = new LinkedHashMap<>();
 
-        // millisBehindLatest equals zero when record processing is caught up,
-        // and there are no new records to process at this moment.
-        // See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
-        Awaitility.await().until(() -> addMoreRecordsAndGetMillisBehindLatest(actualKvs, iterator) == 0);
+        addMoreRecords(actualKvs, iterator);

Review Comment:
   Please remove it from this change. Submit it as a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org