You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/06/01 12:15:44 UTC

[GitHub] [ignite] isapego commented on a change in pull request #8847: IGNITE-14187 .NET Thin Client: DataStreamer

isapego commented on a change in pull request #8847:
URL: https://github.com/apache/ignite/pull/8847#discussion_r643046552



##########
File path: modules/platforms/dotnet/Apache.Ignite.Core/Client/Datastream/IDataStreamerClient.cs
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Client.Datastream
+{
+    using System;
+    using System.Threading.Tasks;
+
+    /// <summary>
+    /// Thin client data streamer.
+    /// <para />
+    /// Data streamer is an efficient and fault-tolerant way to load data into cache. Updates are buffered and mapped
+    /// to primary nodes to ensure minimal data movement and optimal resource utilization.
+    /// Update failures caused by cluster topology changes are retried automatically.
+    /// <para />
+    /// Note that streamer send data to remote nodes asynchronously, so cache updates can be reordered.
+    /// <para />
+    /// Instances of the implementing class are thread-safe: data can be added from multiple threads.
+    /// <para />
+    /// Closing and disposing: <see cref="IDisposable.Dispose"/> method calls <see cref="Close"/><c>(false)</c>.
+    /// This will flush any remaining data to the cache synchronously.
+    /// To avoid blocking threads when exiting <c>using()</c> block, use <see cref="CloseAsync"/>.
+    /// </summary>
+    public interface IDataStreamerClient<TK, TV> : IDisposable
+    {
+        /// <summary>
+        /// Gets the cache name.
+        /// </summary>
+        string CacheName { get; }
+
+        /// <summary>
+        /// Gets a value indicating whether this streamer is closed.
+        /// </summary>
+        bool IsClosed { get; }
+
+        /// <summary>
+        /// Gets the options.
+        /// </summary>
+        DataStreamerClientOptions<TK, TV> Options { get; }
+
+        /// <summary>
+        /// Adds an entry to the streamer.
+        /// <para />
+        /// This method adds an entry to the buffer. When the buffer gets full, it is scheduled for
+        /// asynchronous background flush. This method will block when the number of active flush operations
+        /// exceeds <see cref="DataStreamerClientOptions.PerNodeParallelOperations"/>.
+        /// </summary>
+        /// <param name="key">Key.</param>
+        /// <param name="val">Value.</param>
+        void Add(TK key, TV val);

Review comment:
       What happens if `val` is null?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.streamer;
+
+import java.util.Collection;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientLongResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientPlatform;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRequest;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.stream.StreamReceiver;
+
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.ALLOW_OVERWRITE;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.CLOSE;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.FLUSH;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.KEEP_BINARY;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.SKIP_STORE;
+
+/**
+ *
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest {
+    /** */
+    private final int cacheId;
+
+    /** */
+    private final byte flags;
+
+    /** */
+    private final int perNodeBufferSize;
+
+    /** */
+    private final int perThreadBufferSize;
+
+    /** Receiver object */
+    private final Object receiverObj;
+
+    /** Receiver platform. */
+    private final byte receiverPlatform;
+
+    /** Data entries. */
+    private final Collection<DataStreamerEntry> entries;
+
+    /**
+     * Ctor.
+     *
+     * @param reader Data reader.
+     */
+    public ClientDataStreamerStartRequest(BinaryReaderExImpl reader) {
+        super(reader);
+
+        cacheId = reader.readInt();
+        flags = reader.readByte();
+        perNodeBufferSize = reader.readInt();
+        perThreadBufferSize = reader.readInt();
+        receiverObj = reader.readObjectDetached();
+        receiverPlatform = receiverObj == null ? 0 : reader.readByte();
+        entries = ClientDataStreamerReader.read(reader);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientResponse process(ClientConnectionContext ctx) {
+        String cacheName = ClientCacheRequest.cacheDescriptor(ctx, cacheId).cacheName();
+        DataStreamerImpl<KeyCacheObject, CacheObject> dataStreamer = (DataStreamerImpl<KeyCacheObject, CacheObject>)
+                ctx.kernalContext().grid().<KeyCacheObject, CacheObject>dataStreamer(cacheName);
+
+        try {
+            // Don't use thread buffer for a one-off streamer operation.
+            boolean close = (flags & CLOSE) != 0;
+            boolean keepBinary = (flags & KEEP_BINARY) != 0;
+            boolean useThreadBuffer = !close;
+
+            if (perNodeBufferSize >= 0)
+                dataStreamer.perNodeBufferSize(perNodeBufferSize);
+            else if (entries != null && !entries.isEmpty() && close)
+                dataStreamer.perNodeBufferSize(entries.size());
+
+            if (perThreadBufferSize >= 0 && useThreadBuffer)
+                dataStreamer.perThreadBufferSize(perThreadBufferSize);
+
+            dataStreamer.allowOverwrite((flags & ALLOW_OVERWRITE) != 0);
+            dataStreamer.skipStore((flags & SKIP_STORE) != 0);
+            dataStreamer.keepBinary(keepBinary);
+
+            if (receiverObj != null)
+                dataStreamer.receiver(createReceiver(ctx.kernalContext(), receiverObj, receiverPlatform, keepBinary));
+
+            if (entries != null)
+                dataStreamer.addDataInternal(entries, useThreadBuffer);
+
+            if ((flags & FLUSH) != 0)
+                dataStreamer.flush();
+
+            if (close) {
+                dataStreamer.close();
+
+                return new ClientLongResponse(requestId(), 0);
+            } else {
+                long rsrcId = ctx.resources().put(new ClientDataStreamerHandle(dataStreamer));
+
+                return new ClientLongResponse(requestId(), rsrcId);
+            }
+        }
+        catch (IllegalStateException unused) {
+            return getInvalidNodeStateResponse();
+        }
+    }
+
+    /**
+     * Creates the receiver.
+     *
+     * @return Receiver.

Review comment:
       Not sure if its intentional but you have not described params here.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.streamer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+
+/**
+ * Data streamer deserialization helpers.
+ */
+class ClientDataStreamerReader {
+    /**
+     * Reads an entry.
+     *
+     * @param reader Data reader.
+     * @return Streamer entry.
+     */
+    public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader) {
+        int entriesCnt = reader.readInt();
+
+        if (entriesCnt == 0)
+            return null;
+
+        Collection<DataStreamerEntry> entries = new ArrayList<>(entriesCnt);
+
+        for (int i = 0; i < entriesCnt; i++) {
+            entries.add(new DataStreamerEntry(readCacheObject(reader, true),
+                    readCacheObject(reader, false)));
+        }
+
+        return entries;
+    }
+
+    /**
+     * Read cache object from the stream as raw bytes to avoid marshalling.
+     */
+    private static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
+        BinaryInputStream in = reader.in();
+
+        int pos0 = in.position();
+
+        Object obj = reader.readObjectDetached();

Review comment:
       So we still read object here to find out its size. What's the point?

##########
File path: modules/platforms/dotnet/Apache.Ignite.Core/Client/Datastream/DataStreamerClientOptions.cs
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Client.Datastream
+{
+    using System;
+    using Apache.Ignite.Core.Datastream;
+    using Apache.Ignite.Core.Impl.Common;
+
+    /// <summary>
+    /// Thin client data streamer options.
+    /// <para />
+    /// To set a receiver, use generic class <see cref="DataStreamerClientOptions{K,V}"/>.
+    /// <para />
+    /// See also <see cref="IDataStreamerClient{TK,TV}"/>, <see cref="IIgniteClient.GetDataStreamer{TK,TV}(string)"/>.
+    /// </summary>
+    public class DataStreamerClientOptions
+    {
+        /// <summary>
+        /// The default client-side per-node buffer size,

Review comment:
       Probably it's worth mentioning the units of this value (I believe, it is number of entities)?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.streamer;
+
+import java.util.Collection;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientLongResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientPlatform;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRequest;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.stream.StreamReceiver;
+
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.ALLOW_OVERWRITE;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.CLOSE;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.FLUSH;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.KEEP_BINARY;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.SKIP_STORE;
+
+/**
+ *
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest {
+    /** */
+    private final int cacheId;
+
+    /** */
+    private final byte flags;
+
+    /** */
+    private final int perNodeBufferSize;
+
+    /** */
+    private final int perThreadBufferSize;
+
+    /** Receiver object */
+    private final Object receiverObj;
+
+    /** Receiver platform. */
+    private final byte receiverPlatform;
+
+    /** Data entries. */
+    private final Collection<DataStreamerEntry> entries;
+
+    /**
+     * Ctor.
+     *
+     * @param reader Data reader.
+     */
+    public ClientDataStreamerStartRequest(BinaryReaderExImpl reader) {
+        super(reader);
+
+        cacheId = reader.readInt();
+        flags = reader.readByte();
+        perNodeBufferSize = reader.readInt();
+        perThreadBufferSize = reader.readInt();
+        receiverObj = reader.readObjectDetached();
+        receiverPlatform = receiverObj == null ? 0 : reader.readByte();
+        entries = ClientDataStreamerReader.read(reader);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientResponse process(ClientConnectionContext ctx) {
+        String cacheName = ClientCacheRequest.cacheDescriptor(ctx, cacheId).cacheName();
+        DataStreamerImpl<KeyCacheObject, CacheObject> dataStreamer = (DataStreamerImpl<KeyCacheObject, CacheObject>)
+                ctx.kernalContext().grid().<KeyCacheObject, CacheObject>dataStreamer(cacheName);
+
+        try {
+            // Don't use thread buffer for a one-off streamer operation.
+            boolean close = (flags & CLOSE) != 0;
+            boolean keepBinary = (flags & KEEP_BINARY) != 0;
+            boolean useThreadBuffer = !close;
+
+            if (perNodeBufferSize >= 0)
+                dataStreamer.perNodeBufferSize(perNodeBufferSize);
+            else if (entries != null && !entries.isEmpty() && close)
+                dataStreamer.perNodeBufferSize(entries.size());
+
+            if (perThreadBufferSize >= 0 && useThreadBuffer)
+                dataStreamer.perThreadBufferSize(perThreadBufferSize);
+
+            dataStreamer.allowOverwrite((flags & ALLOW_OVERWRITE) != 0);
+            dataStreamer.skipStore((flags & SKIP_STORE) != 0);

Review comment:
       Would not it be more consistent to define those check as variables at the beginning of the block as well? I understand there is only single place where they are used, but would look more consistent and readable to my liking. Anyways up to you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org