You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/31 04:01:28 UTC

[45/50] [abbrv] ignite git commit: IGNITE-1318: Moved platform data streamer to Ignite.

IGNITE-1318: Moved platform data streamer to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16c095a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16c095a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16c095a9

Branch: refs/heads/ignite-843
Commit: 16c095a9e9a30db630caa8a6ecec98ac5256962e
Parents: 207b682
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 15:01:06 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 15:01:06 2015 +0300

----------------------------------------------------------------------
 .../processors/platform/PlatformContext.java    |  11 +
 .../datastreamer/PlatformDataStreamer.java      | 222 +++++++++++++++++++
 .../datastreamer/PlatformStreamReceiver.java    | 114 ++++++++++
 3 files changed, 347 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/16c095a9/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index cbcc91b..9b4a891 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.platform.cache.query.*;
 import org.apache.ignite.internal.processors.platform.callback.*;
 import org.apache.ignite.internal.processors.platform.compute.*;
 import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.stream.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -249,4 +250,14 @@ public interface PlatformContext {
      * @return Entry filter.
      */
     public PlatformCacheEntryFilter createCacheEntryFilter(Object filter, long ptr);
+
+    /**
+     * Create stream receiver.
+     *
+     * @param rcv Native receiver.
+     * @param ptr Pointer.
+     * @param keepPortable Keep portable flag.
+     * @return Stream receiver.
+     */
+    public StreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16c095a9/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
new file mode 100644
index 0000000..fc9f535
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.datastreamer.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.lang.*;
+
+import java.util.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Interop data streamer wrapper.
+ */
+@SuppressWarnings({"UnusedDeclaration", "unchecked"})
+public class PlatformDataStreamer extends PlatformAbstractTarget {
+    /** Policy: continue. */
+    private static final int PLC_CONTINUE = 0;
+
+    /** Policy: close. */
+    private static final int PLC_CLOSE = 1;
+
+    /** Policy: cancel and close. */
+    private static final int PLC_CANCEL_CLOSE = 2;
+
+    /** Policy: do flush. */
+    private static final int PLC_FLUSH = 3;
+
+    /** */
+    private static final int OP_UPDATE = 1;
+
+    /** */
+    private static final int OP_RECEIVER = 2;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Data streamer. */
+    private final DataStreamerImpl ldr;
+
+    /** Portable flag. */
+    private final boolean keepPortable;
+
+    /** Topology update event listener. */
+    private volatile GridLocalEventListener lsnr;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param ldr Data streamer.
+     */
+    public PlatformDataStreamer(PlatformContext platformCtx, String cacheName, DataStreamerImpl ldr,
+        boolean keepPortable) {
+        super(platformCtx);
+
+        this.cacheName = cacheName;
+        this.ldr = ldr;
+        this.keepPortable = keepPortable;
+    }
+
+    /** {@inheritDoc}  */
+    @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_UPDATE:
+                int plc = reader.readInt();
+
+                if (plc == PLC_CANCEL_CLOSE) {
+                    // Close with cancel.
+                    platformCtx.kernalContext().event().removeLocalEventListener(lsnr);
+
+                    ldr.close(true);
+                }
+                else {
+                    final long futPtr = reader.readLong();
+
+                    int valsCnt = reader.readInt();
+
+                    if (valsCnt > 0) {
+                        Collection<GridMapEntry> vals = new ArrayList<>(valsCnt);
+
+                        for (int i = 0; i < valsCnt; i++)
+                            vals.add(new GridMapEntry(reader.readObjectDetached(), reader.readObjectDetached()));
+
+                        PlatformFutureUtils.listen(platformCtx, ldr.addData(vals), futPtr,
+                            PlatformFutureUtils.TYP_OBJ);
+                    }
+
+                    if (plc == PLC_CLOSE) {
+                        platformCtx.kernalContext().event().removeLocalEventListener(lsnr);
+
+                        ldr.close(false);
+                    }
+                    else if (plc == PLC_FLUSH)
+                        ldr.tryFlush();
+                    else
+                        assert plc == PLC_CONTINUE;
+                }
+
+                return TRUE;
+
+            case OP_RECEIVER:
+                long ptr = reader.readLong();
+
+                Object rec = reader.readObjectDetached();
+
+                ldr.receiver(platformCtx.createStreamReceiver(rec, ptr, keepPortable));
+
+                return TRUE;
+
+            default:
+                return throwUnsupported(type);
+        }
+    }
+
+    /**
+     * Listen topology changes.
+     *
+     * @param ptr Pointer.
+     */
+    public void listenTopology(final long ptr) {
+        lsnr = new GridLocalEventListener() {
+            @Override public void onEvent(Event evt) {
+                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                long topVer = discoEvt.topologyVersion();
+                int topSize = platformCtx.kernalContext().discovery().cacheNodes(
+                    cacheName, new AffinityTopologyVersion(topVer)).size();
+
+                platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
+            }
+        };
+
+        platformCtx.kernalContext().event().addLocalEventListener(lsnr, EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
+
+        long topVer = discoMgr.topologyVersion();
+        int topSize = discoMgr.cacheNodes(cacheName, new AffinityTopologyVersion(topVer)).size();
+
+        platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
+    }
+
+    /**
+     * @return Allow-overwrite flag.
+     */
+    public boolean allowOverwrite() {
+        return ldr.allowOverwrite();
+    }
+
+    /**
+     * @param val Allow-overwrite flag.
+     */
+    public void allowOverwrite(boolean val) {
+        ldr.allowOverwrite(val);
+    }
+
+    /**
+     * @return Skip store flag.
+     */
+    public boolean skipStore() {
+        return ldr.skipStore();
+    }
+
+    /**
+     * @param skipStore Skip store flag.
+     */
+    public void skipStore(boolean skipStore) {
+        ldr.skipStore(skipStore);
+    }
+
+    /**
+     * @return Per-node buffer size.
+     */
+    public int perNodeBufferSize() {
+        return ldr.perNodeBufferSize();
+    }
+
+    /**
+     * @param val Per-node buffer size.
+     */
+    public void perNodeBufferSize(int val) {
+        ldr.perNodeBufferSize(val);
+    }
+
+    /**
+     * @return Per-node parallel load operations.
+     */
+    public int perNodeParallelOperations() {
+        return ldr.perNodeParallelOperations();
+    }
+
+    /**
+     * @param val Per-node parallel load operations.
+     */
+    public void perNodeParallelOperations(int val) {
+        ldr.perNodeParallelOperations(val);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/16c095a9/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java
new file mode 100644
index 0000000..70bfb6b
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.cache.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.stream.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Interop receiver.
+ */
+public class PlatformStreamReceiver<K, V> extends PlatformAbstractPredicate implements StreamReceiver<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private boolean keepPortable;
+
+    /**
+     * Constructor.
+     */
+    public PlatformStreamReceiver()
+    {
+        super();
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable receiver.
+     * @param ptr Pointer to receiver in the native platform.
+     * @param ctx Kernal context.
+     */
+    public PlatformStreamReceiver(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) {
+        super(pred, ptr, ctx);
+
+        assert pred != null;
+
+        this.keepPortable = keepPortable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> collection)
+        throws IgniteException {
+        assert ctx != null;
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(pred);
+
+            writer.writeInt(collection.size());
+
+            for (Map.Entry<K, V> e : collection) {
+                writer.writeObject(e.getKey());
+                writer.writeObject(e.getValue());
+            }
+
+            out.synchronize();
+
+            ctx.gateway().dataStreamerStreamReceiverInvoke(ptr,
+                new PlatformCache(ctx, cache, keepPortable), mem.pointer(), keepPortable);
+        }
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    @IgniteInstanceResource
+    public void setIgniteInstance(Ignite ignite) {
+        ctx = PlatformUtils.platformContext(ignite);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        out.writeBoolean(keepPortable);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+
+        keepPortable = in.readBoolean();
+    }
+
+}