You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "isapego (via GitHub)" <gi...@apache.org> on 2023/06/01 13:43:34 UTC

[GitHub] [ignite-3] isapego commented on a diff in pull request #2128: IGNITE-19540 Add Basic Data Streamer

isapego commented on code in PR #2128:
URL: https://github.com/apache/ignite-3/pull/2128#discussion_r1213050479


##########
modules/api/src/main/java/org/apache/ignite/table/DataStreamerOptions.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table;
+
+/**
+ * Data streamer options. See {@link DataStreamerTarget} for more information.
+ */
+public class DataStreamerOptions {

Review Comment:
   Why `Options`? We ordinary use `Configuration` in cases like this or am I missing something?



##########
modules/api/src/main/java/org/apache/ignite/table/DataStreamerOptions.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table;
+
+/**
+ * Data streamer options. See {@link DataStreamerTarget} for more information.
+ */
+public class DataStreamerOptions {
+    /** Default options. */
+    public static final DataStreamerOptions DEFAULT = builder().build();
+
+    private final int batchSize;
+
+    private final int perNodeParallelOperations;
+
+    private final int autoFlushFrequency;
+
+    private final int retryLimit;
+
+    /**
+     * Constructor.
+     *
+     * @param batchSize Batch size.
+     * @param perNodeParallelOperations Per node parallel operations.
+     * @param autoFlushFrequency Auto flush frequency.
+     * @param retryLimit Retry limit.
+     */
+    private DataStreamerOptions(int batchSize, int perNodeParallelOperations, int autoFlushFrequency, int retryLimit) {
+        this.batchSize = batchSize;
+        this.perNodeParallelOperations = perNodeParallelOperations;
+        this.autoFlushFrequency = autoFlushFrequency;
+        this.retryLimit = retryLimit;
+    }
+
+    /**
+     * Creates a new builder.
+     *
+     * @return Builder.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Gets the batch size (the number of entries that will be sent to the cluster in one network call).
+     *
+     * @return Batch size.
+     */
+    public int batchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Gets the number of parallel operations per node (how many in-flight requests can be active for a given node).
+     *
+     * @return Per node parallel operations.
+     */
+    public int perNodeParallelOperations() {
+        return perNodeParallelOperations;
+    }
+
+    /**
+     * Gets the auto flush frequency, in milliseconds
+     * (the period of time after which the streamer will flush the per-node buffer even if it is not full).
+     *
+     * @return Auto flush frequency.
+     */
+    public int autoFlushFrequency() {
+        return autoFlushFrequency;
+    }
+
+    /**
+     * Gets the retry limit for a batch. If a batch fails to be sent to the cluster, the streamer will retry it a number of times.
+     * If all retries fail, the streamer will be aborted.
+     *
+     * @return Retry limit.
+     */
+    public int retryLimit() {
+        return retryLimit;
+    }
+
+    /**
+     * Builder.
+     */
+    public static class Builder {
+        private int batchSize = 1000;
+
+        private int perNodeParallelOperations = 4;
+
+        private int autoFlushFrequency = 5000;
+
+        private int retryLimit = 16;

Review Comment:
   Have we discussed defaults anywhere?



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/StreamerBuffer.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+class StreamerBuffer<T> {
+    private final int capacity;
+
+    private final Function<List<T>, CompletableFuture<Void>> flusher;
+
+    /** Primary buffer. Won't grow over capacity. */
+    private List<T> buf;
+
+    private CompletableFuture<Void> flushFut;
+
+    private boolean closed;
+
+    private long lastFlushTime;
+
+    StreamerBuffer(int capacity, Function<List<T>, CompletableFuture<Void>> flusher) {
+        this.capacity = capacity;
+        this.flusher = flusher;
+        buf = new ArrayList<>(capacity);
+    }
+
+    /**
+     * Adds item to the buffer.
+     *
+     * @param item Item.
+     */
+    synchronized void add(T item) {
+        if (closed) {
+            throw new IllegalStateException("Streamer is closed, can't add items.");
+        }
+
+        buf.add(item);
+
+        if (buf.size() >= capacity) {
+            flush(buf);
+            buf = new ArrayList<>(capacity);
+        }
+    }
+
+    synchronized CompletableFuture<Void> flushAndClose() {
+        if (closed) {
+            throw new IllegalStateException("Streamer is already closed.");
+        }
+
+        closed = true;
+
+        if (!buf.isEmpty()) {
+            flush(buf);
+        }
+
+        return flushFut == null ? CompletableFuture.completedFuture(null) : flushFut;
+    }
+
+    synchronized void flush(long period) {
+        if (closed || buf.isEmpty()) {
+            return;
+        }
+
+        if (System.currentTimeMillis() - lastFlushTime > period) {

Review Comment:
   AFAIK, `System.currentTimeMillis()` is not monotonic, so there may be an issue (for example, when system time is changed 1 hour due to seasonal time change). On the other hand, I'm not sure there are any monotonic clocks in Java at all, though some says that `System.nanoTime()` is monotonic on some platforms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org