You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/31 04:01:28 UTC
[45/50] [abbrv] ignite git commit: IGNITE-1318: Moved platform data
streamer to Ignite.
IGNITE-1318: Moved platform data streamer to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16c095a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16c095a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16c095a9
Branch: refs/heads/ignite-843
Commit: 16c095a9e9a30db630caa8a6ecec98ac5256962e
Parents: 207b682
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 15:01:06 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 15:01:06 2015 +0300
----------------------------------------------------------------------
.../processors/platform/PlatformContext.java | 11 +
.../datastreamer/PlatformDataStreamer.java | 222 +++++++++++++++++++
.../datastreamer/PlatformStreamReceiver.java | 114 ++++++++++
3 files changed, 347 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c095a9/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index cbcc91b..9b4a891 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.platform.cache.query.*;
import org.apache.ignite.internal.processors.platform.callback.*;
import org.apache.ignite.internal.processors.platform.compute.*;
import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.stream.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -249,4 +250,14 @@ public interface PlatformContext {
* @return Entry filter.
*/
public PlatformCacheEntryFilter createCacheEntryFilter(Object filter, long ptr);
+
+ /**
+ * Create stream receiver.
+ *
+ * @param rcv Native receiver.
+ * @param ptr Pointer.
+ * @param keepPortable Keep portable flag.
+ * @return Stream receiver.
+ */
+ public StreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c095a9/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
new file mode 100644
index 0000000..fc9f535
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.datastreamer.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.lang.*;
+
+import java.util.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Interop data streamer wrapper.
+ */
+@SuppressWarnings({"UnusedDeclaration", "unchecked"})
+public class PlatformDataStreamer extends PlatformAbstractTarget {
+ /** Policy: continue. */
+ private static final int PLC_CONTINUE = 0;
+
+ /** Policy: close. */
+ private static final int PLC_CLOSE = 1;
+
+ /** Policy: cancel and close. */
+ private static final int PLC_CANCEL_CLOSE = 2;
+
+ /** Policy: do flush. */
+ private static final int PLC_FLUSH = 3;
+
+ /** */
+ private static final int OP_UPDATE = 1;
+
+ /** */
+ private static final int OP_RECEIVER = 2;
+
+ /** Cache name. */
+ private final String cacheName;
+
+ /** Data streamer. */
+ private final DataStreamerImpl ldr;
+
+ /** Portable flag. */
+ private final boolean keepPortable;
+
+ /** Topology update event listener. */
+ private volatile GridLocalEventListener lsnr;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ * @param ldr Data streamer.
+ */
+ public PlatformDataStreamer(PlatformContext platformCtx, String cacheName, DataStreamerImpl ldr,
+ boolean keepPortable) {
+ super(platformCtx);
+
+ this.cacheName = cacheName;
+ this.ldr = ldr;
+ this.keepPortable = keepPortable;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+ switch (type) {
+ case OP_UPDATE:
+ int plc = reader.readInt();
+
+ if (plc == PLC_CANCEL_CLOSE) {
+ // Close with cancel.
+ platformCtx.kernalContext().event().removeLocalEventListener(lsnr);
+
+ ldr.close(true);
+ }
+ else {
+ final long futPtr = reader.readLong();
+
+ int valsCnt = reader.readInt();
+
+ if (valsCnt > 0) {
+ Collection<GridMapEntry> vals = new ArrayList<>(valsCnt);
+
+ for (int i = 0; i < valsCnt; i++)
+ vals.add(new GridMapEntry(reader.readObjectDetached(), reader.readObjectDetached()));
+
+ PlatformFutureUtils.listen(platformCtx, ldr.addData(vals), futPtr,
+ PlatformFutureUtils.TYP_OBJ);
+ }
+
+ if (plc == PLC_CLOSE) {
+ platformCtx.kernalContext().event().removeLocalEventListener(lsnr);
+
+ ldr.close(false);
+ }
+ else if (plc == PLC_FLUSH)
+ ldr.tryFlush();
+ else
+ assert plc == PLC_CONTINUE;
+ }
+
+ return TRUE;
+
+ case OP_RECEIVER:
+ long ptr = reader.readLong();
+
+ Object rec = reader.readObjectDetached();
+
+ ldr.receiver(platformCtx.createStreamReceiver(rec, ptr, keepPortable));
+
+ return TRUE;
+
+ default:
+ return throwUnsupported(type);
+ }
+ }
+
+ /**
+ * Listen topology changes.
+ *
+ * @param ptr Pointer.
+ */
+ public void listenTopology(final long ptr) {
+ lsnr = new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ long topVer = discoEvt.topologyVersion();
+ int topSize = platformCtx.kernalContext().discovery().cacheNodes(
+ cacheName, new AffinityTopologyVersion(topVer)).size();
+
+ platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
+ }
+ };
+
+ platformCtx.kernalContext().event().addLocalEventListener(lsnr, EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+ GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
+
+ long topVer = discoMgr.topologyVersion();
+ int topSize = discoMgr.cacheNodes(cacheName, new AffinityTopologyVersion(topVer)).size();
+
+ platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
+ }
+
+ /**
+ * @return Allow-overwrite flag.
+ */
+ public boolean allowOverwrite() {
+ return ldr.allowOverwrite();
+ }
+
+ /**
+ * @param val Allow-overwrite flag.
+ */
+ public void allowOverwrite(boolean val) {
+ ldr.allowOverwrite(val);
+ }
+
+ /**
+ * @return Skip store flag.
+ */
+ public boolean skipStore() {
+ return ldr.skipStore();
+ }
+
+ /**
+ * @param skipStore Skip store flag.
+ */
+ public void skipStore(boolean skipStore) {
+ ldr.skipStore(skipStore);
+ }
+
+ /**
+ * @return Per-node buffer size.
+ */
+ public int perNodeBufferSize() {
+ return ldr.perNodeBufferSize();
+ }
+
+ /**
+ * @param val Per-node buffer size.
+ */
+ public void perNodeBufferSize(int val) {
+ ldr.perNodeBufferSize(val);
+ }
+
+ /**
+ * @return Per-node parallel load operations.
+ */
+ public int perNodeParallelOperations() {
+ return ldr.perNodeParallelOperations();
+ }
+
+ /**
+ * @param val Per-node parallel load operations.
+ */
+ public void perNodeParallelOperations(int val) {
+ ldr.perNodeParallelOperations(val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c095a9/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java
new file mode 100644
index 0000000..70bfb6b
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.cache.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.stream.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Interop receiver.
+ */
+public class PlatformStreamReceiver<K, V> extends PlatformAbstractPredicate implements StreamReceiver<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private boolean keepPortable;
+
+ /**
+ * Constructor.
+ */
+ public PlatformStreamReceiver()
+ {
+ super();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param pred .Net portable receiver.
+ * @param ptr Pointer to receiver in the native platform.
+ * @param ctx Kernal context.
+ */
+ public PlatformStreamReceiver(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) {
+ super(pred, ptr, ctx);
+
+ assert pred != null;
+
+ this.keepPortable = keepPortable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> collection)
+ throws IgniteException {
+ assert ctx != null;
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(pred);
+
+ writer.writeInt(collection.size());
+
+ for (Map.Entry<K, V> e : collection) {
+ writer.writeObject(e.getKey());
+ writer.writeObject(e.getValue());
+ }
+
+ out.synchronize();
+
+ ctx.gateway().dataStreamerStreamReceiverInvoke(ptr,
+ new PlatformCache(ctx, cache, keepPortable), mem.pointer(), keepPortable);
+ }
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ @IgniteInstanceResource
+ public void setIgniteInstance(Ignite ignite) {
+ ctx = PlatformUtils.platformContext(ignite);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ out.writeBoolean(keepPortable);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ keepPortable = in.readBoolean();
+ }
+
+}