You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/06/01 15:20:41 UTC

[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8847: IGNITE-14187 .NET Thin Client: DataStreamer

alex-plekhanov commented on a change in pull request #8847:
URL: https://github.com/apache/ignite/pull/8847#discussion_r643141181



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.streamer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+
+/**
+ * Data streamer deserialization helpers.
+ */
+class ClientDataStreamerReader {
+    /**
+     * Reads an entry.
+     *
+     * @param reader Data reader.
+     * @return Streamer entry.
+     */
+    public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader) {
+        int entriesCnt = reader.readInt();
+
+        if (entriesCnt == 0)
+            return null;
+
+        Collection<DataStreamerEntry> entries = new ArrayList<>(entriesCnt);
+
+        for (int i = 0; i < entriesCnt; i++) {
+            entries.add(new DataStreamerEntry(readCacheObject(reader, true),
+                    readCacheObject(reader, false)));
+        }
+
+        return entries;
+    }
+
+    /**
+     * Read cache object from the stream as raw bytes to avoid marshalling.
+     */
+    private static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
+        BinaryInputStream in = reader.in();
+
+        int pos0 = in.position();
+
+        Object obj = reader.readObjectDetached();
+
+        if (obj == null)
+            return null;
+
+        int pos1 = in.position();
+
+        in.position(pos0);
+
+        byte[] objBytes = in.readByteArray(pos1 - pos0);
+
+        return isKey ? (T) new KeyCacheObjectImpl(obj, objBytes, -1) : (T) new CacheObjectImpl(obj, objBytes);
+    }
+

Review comment:
       Redundant NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.streamer;
+
+import java.util.Collection;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientLongResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientPlatform;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRequest;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.stream.StreamReceiver;
+
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.ALLOW_OVERWRITE;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.CLOSE;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.FLUSH;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.KEEP_BINARY;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.SKIP_STORE;
+
+/**
+ *
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest {
+    /** */
+    private final int cacheId;
+
+    /** */
+    private final byte flags;
+
+    /** */
+    private final int perNodeBufferSize;
+
+    /** */
+    private final int perThreadBufferSize;
+
+    /** Receiver object */
+    private final Object receiverObj;
+
+    /** Receiver platform. */
+    private final byte receiverPlatform;
+
+    /** Data entries. */
+    private final Collection<DataStreamerEntry> entries;
+
+    /**
+     * Ctor.
+     *
+     * @param reader Data reader.
+     */
+    public ClientDataStreamerStartRequest(BinaryReaderExImpl reader) {
+        super(reader);
+
+        cacheId = reader.readInt();
+        flags = reader.readByte();
+        perNodeBufferSize = reader.readInt();
+        perThreadBufferSize = reader.readInt();
+        receiverObj = reader.readObjectDetached();
+        receiverPlatform = receiverObj == null ? 0 : reader.readByte();
+        entries = ClientDataStreamerReader.read(reader);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientResponse process(ClientConnectionContext ctx) {
+        String cacheName = ClientCacheRequest.cacheDescriptor(ctx, cacheId).cacheName();
+        DataStreamerImpl<KeyCacheObject, CacheObject> dataStreamer = (DataStreamerImpl<KeyCacheObject, CacheObject>)
+                ctx.kernalContext().grid().<KeyCacheObject, CacheObject>dataStreamer(cacheName);
+
+        try {
+            // Don't use thread buffer for a one-off streamer operation.
+            boolean close = (flags & CLOSE) != 0;
+            boolean keepBinary = (flags & KEEP_BINARY) != 0;
+            boolean useThreadBuffer = !close;
+
+            if (perNodeBufferSize >= 0)
+                dataStreamer.perNodeBufferSize(perNodeBufferSize);
+            else if (entries != null && !entries.isEmpty() && close)
+                dataStreamer.perNodeBufferSize(entries.size());
+
+            if (perThreadBufferSize >= 0 && useThreadBuffer)
+                dataStreamer.perThreadBufferSize(perThreadBufferSize);
+
+            dataStreamer.allowOverwrite((flags & ALLOW_OVERWRITE) != 0);
+            dataStreamer.skipStore((flags & SKIP_STORE) != 0);
+            dataStreamer.keepBinary(keepBinary);
+
+            if (receiverObj != null)
+                dataStreamer.receiver(createReceiver(ctx.kernalContext(), receiverObj, receiverPlatform, keepBinary));
+
+            if (entries != null)
+                dataStreamer.addDataInternal(entries, useThreadBuffer);
+
+            if ((flags & FLUSH) != 0)
+                dataStreamer.flush();
+
+            if (close) {
+                dataStreamer.close();
+
+                return new ClientLongResponse(requestId(), 0);
+            } else {
+                long rsrcId = ctx.resources().put(new ClientDataStreamerHandle(dataStreamer));
+
+                return new ClientLongResponse(requestId(), rsrcId);
+            }
+        }
+        catch (IllegalStateException unused) {

Review comment:
       I'm not sure we should handle all "illegal state" exceptions as "node stopping". Perhaps the same logic should be reused as in `ClientRequestHandler.getStatus()` 

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.streamer;
+
+import java.util.Collection;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientLongResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientPlatform;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRequest;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.stream.StreamReceiver;
+
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.ALLOW_OVERWRITE;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.CLOSE;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.FLUSH;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.KEEP_BINARY;
+import static org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerFlags.SKIP_STORE;
+
+/**
+ *

Review comment:
       Javadoc

##########
File path: modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Datastream/DataStreamerClientTest.cs
##########
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Datastream
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Cache.Configuration;
+    using Apache.Ignite.Core.Cache.Store;
+    using Apache.Ignite.Core.Client;
+    using Apache.Ignite.Core.Client.Datastream;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Datastream;
+    using Apache.Ignite.Core.Impl.Client.Datastream;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests for <see cref="IDataStreamerClient{TK,TV}"/>.
+    /// </summary>
+    public class DataStreamerClientTest : ClientTestBase
+    {
+        /** */
+        private const int GridCount = 3;
+
+        /// <summary>
+        /// Initializes a new instance of <see cref="DataStreamerClientTest"/>.
+        /// </summary>
+        public DataStreamerClientTest()
+            : this(false)
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Initializes a new instance of <see cref="DataStreamerClientTest"/>.
+        /// </summary>
+        public DataStreamerClientTest(bool enablePartitionAwareness)
+            : base(GridCount, enableSsl: false, enablePartitionAwareness: enablePartitionAwareness)
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Tests basic streaming with default options.
+        /// </summary>
+        [Test]
+        public void TestBasicStreaming()
+        {
+            var cache = GetClientCache<string>();
+
+            using (var streamer = Client.GetDataStreamer<int, string>(cache.Name))
+            {
+                Assert.AreEqual(cache.Name, streamer.CacheName);
+
+                streamer.Add(1, "1");
+                streamer.Add(2, "2");
+            }
+
+            Assert.AreEqual("1", cache[1]);
+            Assert.AreEqual("2", cache[2]);
+        }
+
+        /// <summary>
+        /// Tests add and remove operations combined.
+        /// </summary>
+        [Test]
+        public void TestAddRemoveOverwrite()
+        {
+            var cache = GetClientCache<int>();
+            cache.PutAll(Enumerable.Range(1, 10).ToDictionary(x => x, x => x + 1));
+
+            var options = new DataStreamerClientOptions {AllowOverwrite = true};
+
+            using (var streamer = Client.GetDataStreamer<int, int>(cache.Name, options))
+            {
+                streamer.Add(1, 11);
+                streamer.Add(20, 20);
+
+                foreach (var key in new[] {2, 4, 6, 7, 8, 9, 10})
+                {
+                    streamer.Remove(key);
+                }
+            }
+
+            var resKeys = cache.GetAll(Enumerable.Range(1, 30))
+                .Select(x => x.Key)
+                .OrderBy(x => x)
+                .ToArray();
+
+            Assert.AreEqual(11, cache.Get(1));
+            Assert.AreEqual(20, cache.Get(20));
+            Assert.AreEqual(4, cache.GetSize());
+            Assert.AreEqual(new[] {1, 3, 5, 20}, resKeys);
+        }
+
+        /// <summary>
+        /// Tests automatic flush when buffer gets full.
+        /// </summary>
+        [Test]
+        public void TestAutoFlushOnFullBuffer()
+        {
+            var cache = GetClientCache<string>();
+            var keys = TestUtils.GetPrimaryKeys(GetIgnite(), cache.Name).Take(10).ToArray();
+
+            // Set server buffers to 1 so that server always flushes the data.
+            var options = new DataStreamerClientOptions<int, int>
+            {
+                PerNodeBufferSize = 3
+            };
+
+            using (var streamer = Client.GetDataStreamer(
+                cache.Name,
+                options))
+            {
+                streamer.Add(keys[1], 1);
+                Assert.AreEqual(0, cache.GetSize());
+
+                streamer.Add(keys[2], 2);
+                Assert.AreEqual(0, cache.GetSize());
+
+                streamer.Add(keys[3], 3);
+                TestUtils.WaitForTrueCondition(() => cache.GetSize() == 3);
+            }
+        }
+
+        /// <summary>
+        /// Tests manual (explicit) flush.
+        /// </summary>
+        [Test]
+        public void TestManualFlush()
+        {
+            var cache = GetClientCache<int>();
+
+            using (var streamer = Client.GetDataStreamer<int, int>(cache.Name))
+            {
+                streamer.Add(1, 1);
+                streamer.Add(2, 2);
+
+                streamer.Flush();
+
+                streamer.Add(3, 3);
+
+                Assert.AreEqual(2, cache.GetSize());
+                Assert.AreEqual(1, cache[1]);
+                Assert.AreEqual(2, cache[2]);
+
+                streamer.Flush();
+                Thread.Sleep(500);
+
+                Assert.AreEqual(3, cache.GetSize());
+                Assert.AreEqual(3, cache[3]);
+            }
+        }
+
+        /// <summary>
+        /// Tests that <see cref="IDataStreamerClient{TK,TV}.Remove"/> throws an exception when
+        /// <see cref="DataStreamerClientOptions.AllowOverwrite"/> is not enabled.
+        /// </summary>
+        [Test]
+        public void TestRemoveNoAllowOverwriteThrows()
+        {
+            var cache = GetClientCache<string>();
+
+            using (var streamer = Client.GetDataStreamer<int, string>(cache.Name))
+            {
+                var ex = Assert.Throws<IgniteClientException>(() => streamer.Remove(1));
+
+                Assert.AreEqual("DataStreamer can't remove data when AllowOverwrite is false.", ex.Message);
+            }
+        }
+
+        /// <summary>
+        /// Tests streaming of relatively long list of entries to verify multiple buffer flush correctness.
+        /// </summary>
+        [Test]
+        [Category(TestUtils.CategoryIntensive)]
+        public void TestStreamLongList()
+        {
+            var cache = GetClientCache<int>();
+            const int count = 50000;
+
+            using (var streamer = Client.GetDataStreamer<int, int>(cache.Name))
+            {
+                for (var k = 0; k < count; k++)
+                {
+                    streamer.Add(k, -k);
+                }
+            }
+
+            Assert.AreEqual(count, cache.GetSize());
+            Assert.AreEqual(-2, cache[2]);
+            Assert.AreEqual(-200, cache[200]);
+        }
+
+        /// <summary>
+        /// Tests streamer usage from multiple threads.
+        /// </summary>
+        [Test]
+        [Category(TestUtils.CategoryIntensive)]
+        public void TestStreamMultithreaded()
+        {
+            var cache = GetClientCache<int>();
+            const int count = 250000;
+            int id = 0;
+
+            using (var streamer = Client.GetDataStreamer<int, int>(cache.Name))
+            {
+                TestUtils.RunMultiThreaded(() =>
+                {
+                    while (true)
+                    {
+                        var key = Interlocked.Increment(ref id);
+
+                        if (key > count)
+                        {
+                            break;
+                        }
+
+                        // ReSharper disable once AccessToDisposedClosure
+                        streamer.Add(key, key + 2);
+                    }
+                }, 8);
+            }
+
+            Assert.AreEqual(count, cache.GetSize());
+            Assert.AreEqual(4, cache[2]);
+            Assert.AreEqual(22, cache[20]);
+        }
+
+        /// <summary>
+        /// Tests streamer usage with Parallel.For, which dynamically allocates threads to perform work.
+        /// This test verifies backpressure behavior quite well.
+        /// </summary>
+        [Test]
+        [Category(TestUtils.CategoryIntensive)]
+        public void TestStreamParallelFor()
+        {
+            var cache = GetClientCache<int>();
+            const int count = 250000;
+
+            using (var streamer = Client.GetDataStreamer<int, int>(cache.Name))
+            {
+                // ReSharper disable once AccessToDisposedClosure
+                Parallel.For(0, count, i => streamer.Add(i, i + 2));
+
+                streamer.Flush();
+                CheckArrayPoolLeak(streamer);
+            }
+
+            var size = cache.GetSize();
+            if (size != count)

Review comment:
       I didn't quite understand, why we can lose some entries in this test?

##########
File path: modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Datastream/DataStreamerClientTest.cs
##########
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Datastream
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Cache.Configuration;
+    using Apache.Ignite.Core.Cache.Store;
+    using Apache.Ignite.Core.Client;
+    using Apache.Ignite.Core.Client.Datastream;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Datastream;
+    using Apache.Ignite.Core.Impl.Client.Datastream;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests for <see cref="IDataStreamerClient{TK,TV}"/>.
+    /// </summary>
+    public class DataStreamerClientTest : ClientTestBase
+    {
+        /** */
+        private const int GridCount = 3;
+
+        /// <summary>
+        /// Initializes a new instance of <see cref="DataStreamerClientTest"/>.
+        /// </summary>
+        public DataStreamerClientTest()
+            : this(false)
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Initializes a new instance of <see cref="DataStreamerClientTest"/>.
+        /// </summary>
+        public DataStreamerClientTest(bool enablePartitionAwareness)
+            : base(GridCount, enableSsl: false, enablePartitionAwareness: enablePartitionAwareness)
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Tests basic streaming with default options.
+        /// </summary>
+        [Test]
+        public void TestBasicStreaming()
+        {
+            var cache = GetClientCache<string>();
+
+            using (var streamer = Client.GetDataStreamer<int, string>(cache.Name))
+            {
+                Assert.AreEqual(cache.Name, streamer.CacheName);
+
+                streamer.Add(1, "1");
+                streamer.Add(2, "2");
+            }
+
+            Assert.AreEqual("1", cache[1]);
+            Assert.AreEqual("2", cache[2]);
+        }
+
+        /// <summary>
+        /// Tests add and remove operations combined.
+        /// </summary>
+        [Test]
+        public void TestAddRemoveOverwrite()
+        {
+            var cache = GetClientCache<int>();
+            cache.PutAll(Enumerable.Range(1, 10).ToDictionary(x => x, x => x + 1));
+
+            var options = new DataStreamerClientOptions {AllowOverwrite = true};
+
+            using (var streamer = Client.GetDataStreamer<int, int>(cache.Name, options))
+            {
+                streamer.Add(1, 11);
+                streamer.Add(20, 20);
+
+                foreach (var key in new[] {2, 4, 6, 7, 8, 9, 10})
+                {
+                    streamer.Remove(key);
+                }
+            }
+
+            var resKeys = cache.GetAll(Enumerable.Range(1, 30))
+                .Select(x => x.Key)
+                .OrderBy(x => x)
+                .ToArray();
+
+            Assert.AreEqual(11, cache.Get(1));
+            Assert.AreEqual(20, cache.Get(20));
+            Assert.AreEqual(4, cache.GetSize());
+            Assert.AreEqual(new[] {1, 3, 5, 20}, resKeys);
+        }
+
+        /// <summary>
+        /// Tests automatic flush when buffer gets full.
+        /// </summary>
+        [Test]
+        public void TestAutoFlushOnFullBuffer()
+        {
+            var cache = GetClientCache<string>();
+            var keys = TestUtils.GetPrimaryKeys(GetIgnite(), cache.Name).Take(10).ToArray();
+
+            // Set server buffers to 1 so that server always flushes the data.
+            var options = new DataStreamerClientOptions<int, int>
+            {
+                PerNodeBufferSize = 3
+            };
+
+            using (var streamer = Client.GetDataStreamer(
+                cache.Name,
+                options))
+            {
+                streamer.Add(keys[1], 1);
+                Assert.AreEqual(0, cache.GetSize());
+
+                streamer.Add(keys[2], 2);
+                Assert.AreEqual(0, cache.GetSize());
+
+                streamer.Add(keys[3], 3);
+                TestUtils.WaitForTrueCondition(() => cache.GetSize() == 3);
+            }
+        }
+
+        /// <summary>
+        /// Tests manual (explicit) flush.
+        /// </summary>
+        [Test]
+        public void TestManualFlush()
+        {
+            var cache = GetClientCache<int>();
+
+            using (var streamer = Client.GetDataStreamer<int, int>(cache.Name))
+            {
+                streamer.Add(1, 1);
+                streamer.Add(2, 2);
+
+                streamer.Flush();
+
+                streamer.Add(3, 3);
+
+                Assert.AreEqual(2, cache.GetSize());
+                Assert.AreEqual(1, cache[1]);
+                Assert.AreEqual(2, cache[2]);
+
+                streamer.Flush();
+                Thread.Sleep(500);

Review comment:
       Why do we need `Sleep` here? I think as far as the flush response received data should be already in the cache.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org