You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/18 12:04:11 UTC
[07/14] ignite git commit: IGNITE-1513: Merged Java to core module.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
deleted file mode 100644
index 3895506..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
+++ /dev/null
@@ -1,621 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterMetrics;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.CacheEvent;
-import org.apache.ignite.events.CacheQueryExecutedEvent;
-import org.apache.ignite.events.CacheQueryReadEvent;
-import org.apache.ignite.events.CacheRebalancingEvent;
-import org.apache.ignite.events.CheckpointEvent;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventAdapter;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.events.JobEvent;
-import org.apache.ignite.events.SwapSpaceEvent;
-import org.apache.ignite.events.TaskEvent;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.portable.GridPortableMarshaller;
-import org.apache.ignite.internal.portable.PortableMetaDataImpl;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.cache.portable.CacheObjectPortableProcessorImpl;
-import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
-import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilterImpl;
-import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessor;
-import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessorImpl;
-import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
-import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
-import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryImpl;
-import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryRemoteFilter;
-import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
-import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilter;
-import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilterImpl;
-import org.apache.ignite.internal.processors.platform.compute.PlatformAbstractTask;
-import org.apache.ignite.internal.processors.platform.compute.PlatformClosureJob;
-import org.apache.ignite.internal.processors.platform.compute.PlatformFullJob;
-import org.apache.ignite.internal.processors.platform.compute.PlatformJob;
-import org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiver;
-import org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiverImpl;
-import org.apache.ignite.internal.processors.platform.events.PlatformEventFilterListenerImpl;
-import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
-import org.apache.ignite.internal.processors.platform.messaging.PlatformMessageFilterImpl;
-import org.apache.ignite.internal.processors.platform.utils.PlatformReaderBiClosure;
-import org.apache.ignite.internal.processors.platform.utils.PlatformReaderClosure;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T4;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.portable.PortableMetadata;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Implementation of platform context.
- */
-public class PlatformContextImpl implements PlatformContext {
- /** Supported event types. */
- private static final Set<Integer> evtTyps;
-
- /** Kernal context. */
- private final GridKernalContext ctx;
-
- /** Marshaller. */
- private final GridPortableMarshaller marsh;
-
- /** Memory manager. */
- private final PlatformMemoryManagerImpl mem;
-
- /** Callback gateway. */
- private final PlatformCallbackGateway gate;
-
- /** Cache object processor. */
- private final CacheObjectPortableProcessorImpl cacheObjProc;
-
- /** Node ids that has been sent to native platform. */
- private final Set<UUID> sentNodes = Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>());
-
- /**
- * Static initializer.
- */
- static {
- Set<Integer> evtTyps0 = new HashSet<>();
-
- addEventTypes(evtTyps0, EventType.EVTS_CACHE);
- addEventTypes(evtTyps0, EventType.EVTS_CACHE_QUERY);
- addEventTypes(evtTyps0, EventType.EVTS_CACHE_REBALANCE);
- addEventTypes(evtTyps0, EventType.EVTS_CHECKPOINT);
- addEventTypes(evtTyps0, EventType.EVTS_DISCOVERY_ALL);
- addEventTypes(evtTyps0, EventType.EVTS_JOB_EXECUTION);
- addEventTypes(evtTyps0, EventType.EVTS_SWAPSPACE);
- addEventTypes(evtTyps0, EventType.EVTS_TASK_EXECUTION);
-
- evtTyps = Collections.unmodifiableSet(evtTyps0);
- }
-
- /**
- * Adds all elements to a set.
- * @param set Set.
- * @param items Items.
- */
- private static void addEventTypes(Set<Integer> set, int[] items) {
- for (int i : items)
- set.add(i);
- }
-
- /**
- * Constructor.
- *
- * @param ctx Kernal context.
- * @param gate Callback gateway.
- * @param mem Memory manager.
- */
- public PlatformContextImpl(GridKernalContext ctx, PlatformCallbackGateway gate, PlatformMemoryManagerImpl mem) {
- this.ctx = ctx;
- this.gate = gate;
- this.mem = mem;
-
- cacheObjProc = (CacheObjectPortableProcessorImpl)ctx.cacheObjects();
-
- marsh = cacheObjProc.marshaller();
- }
-
- /** {@inheritDoc} */
- @Override public GridKernalContext kernalContext() {
- return ctx;
- }
-
- /** {@inheritDoc} */
- @Override public PlatformMemoryManager memory() {
- return mem;
- }
-
- /** {@inheritDoc} */
- @Override public PlatformCallbackGateway gateway() {
- return gate;
- }
-
- /** {@inheritDoc} */
- @Override public PortableRawReaderEx reader(PlatformMemory mem) {
- return reader(mem.input());
- }
-
- /** {@inheritDoc} */
- @Override public PortableRawReaderEx reader(PlatformInputStream in) {
- return marsh.reader(in);
- }
-
- /** {@inheritDoc} */
- @Override public PortableRawWriterEx writer(PlatformMemory mem) {
- return writer(mem.output());
- }
-
- /** {@inheritDoc} */
- @Override public PortableRawWriterEx writer(PlatformOutputStream out) {
- return marsh.writer(out);
- }
-
- /** {@inheritDoc} */
- @Override public void addNode(ClusterNode node) {
- if (node == null || sentNodes.contains(node.id()))
- return;
-
- // Send node info to the native platform
- try (PlatformMemory mem0 = mem.allocate()) {
- PlatformOutputStream out = mem0.output();
-
- PortableRawWriterEx w = writer(out);
-
- w.writeUuid(node.id());
-
- Map<String, Object> attrs = new HashMap<>(node.attributes());
-
- Iterator<Map.Entry<String, Object>> attrIter = attrs.entrySet().iterator();
-
- while (attrIter.hasNext()) {
- Map.Entry<String, Object> entry = attrIter.next();
-
- Object val = entry.getValue();
-
- if (val != null && !val.getClass().getName().startsWith("java.lang"))
- attrIter.remove();
- }
-
- w.writeMap(attrs);
- w.writeCollection(node.addresses());
- w.writeCollection(node.hostNames());
- w.writeLong(node.order());
- w.writeBoolean(node.isLocal());
- w.writeBoolean(node.isDaemon());
- writeClusterMetrics(w, node.metrics());
-
- out.synchronize();
-
- gateway().nodeInfo(mem0.pointer());
- }
-
- sentNodes.add(node.id());
- }
-
- /** {@inheritDoc} */
- @Override public void writeNode(PortableRawWriterEx writer, ClusterNode node) {
- if (node == null) {
- writer.writeUuid(null);
-
- return;
- }
-
- addNode(node);
-
- writer.writeUuid(node.id());
- }
-
- /** {@inheritDoc} */
- @Override public void writeNodes(PortableRawWriterEx writer, Collection<ClusterNode> nodes) {
- if (nodes == null) {
- writer.writeInt(-1);
-
- return;
- }
-
- writer.writeInt(nodes.size());
-
- for (ClusterNode n : nodes) {
- addNode(n);
-
- writer.writeUuid(n.id());
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeClusterMetrics(PortableRawWriterEx writer, @Nullable ClusterMetrics metrics) {
- if (metrics == null)
- writer.writeBoolean(false);
- else {
- writer.writeBoolean(true);
-
- writer.writeLong(metrics.getLastUpdateTime());
- writer.writeDate(new Date(metrics.getLastUpdateTime()));
- writer.writeInt(metrics.getMaximumActiveJobs());
- writer.writeInt(metrics.getCurrentActiveJobs());
- writer.writeFloat(metrics.getAverageActiveJobs());
- writer.writeInt(metrics.getMaximumWaitingJobs());
-
- writer.writeInt(metrics.getCurrentWaitingJobs());
- writer.writeFloat(metrics.getAverageWaitingJobs());
- writer.writeInt(metrics.getMaximumRejectedJobs());
- writer.writeInt(metrics.getCurrentRejectedJobs());
- writer.writeFloat(metrics.getAverageRejectedJobs());
-
- writer.writeInt(metrics.getTotalRejectedJobs());
- writer.writeInt(metrics.getMaximumCancelledJobs());
- writer.writeInt(metrics.getCurrentCancelledJobs());
- writer.writeFloat(metrics.getAverageCancelledJobs());
- writer.writeInt(metrics.getTotalCancelledJobs());
-
- writer.writeInt(metrics.getTotalExecutedJobs());
- writer.writeLong(metrics.getMaximumJobWaitTime());
- writer.writeLong(metrics.getCurrentJobWaitTime());
- writer.writeDouble(metrics.getAverageJobWaitTime());
- writer.writeLong(metrics.getMaximumJobExecuteTime());
-
- writer.writeLong(metrics.getCurrentJobExecuteTime());
- writer.writeDouble(metrics.getAverageJobExecuteTime());
- writer.writeInt(metrics.getTotalExecutedTasks());
- writer.writeLong(metrics.getTotalIdleTime());
- writer.writeLong(metrics.getCurrentIdleTime());
-
- writer.writeInt(metrics.getTotalCpus());
- writer.writeDouble(metrics.getCurrentCpuLoad());
- writer.writeDouble(metrics.getAverageCpuLoad());
- writer.writeDouble(metrics.getCurrentGcCpuLoad());
- writer.writeLong(metrics.getHeapMemoryInitialized());
-
- writer.writeLong(metrics.getHeapMemoryUsed());
- writer.writeLong(metrics.getHeapMemoryCommitted());
- writer.writeLong(metrics.getHeapMemoryMaximum());
- writer.writeLong(metrics.getHeapMemoryTotal());
- writer.writeLong(metrics.getNonHeapMemoryInitialized());
-
- writer.writeLong(metrics.getNonHeapMemoryUsed());
- writer.writeLong(metrics.getNonHeapMemoryCommitted());
- writer.writeLong(metrics.getNonHeapMemoryMaximum());
- writer.writeLong(metrics.getNonHeapMemoryTotal());
- writer.writeLong(metrics.getUpTime());
-
- writer.writeDate(new Date(metrics.getStartTime()));
- writer.writeDate(new Date(metrics.getNodeStartTime()));
- writer.writeInt(metrics.getCurrentThreadCount());
- writer.writeInt(metrics.getMaximumThreadCount());
- writer.writeLong(metrics.getTotalStartedThreadCount());
-
- writer.writeInt(metrics.getCurrentDaemonThreadCount());
- writer.writeLong(metrics.getLastDataVersion());
- writer.writeInt(metrics.getSentMessagesCount());
- writer.writeLong(metrics.getSentBytesCount());
- writer.writeInt(metrics.getReceivedMessagesCount());
-
- writer.writeLong(metrics.getReceivedBytesCount());
- writer.writeInt(metrics.getOutboundMessagesQueueSize());
-
- writer.writeInt(metrics.getTotalNodes());
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ConstantConditions")
- @Override public void processMetadata(PortableRawReaderEx reader) {
- Collection<T4<Integer, String, String, Map<String, Integer>>> metas = PlatformUtils.readCollection(reader,
- new PlatformReaderClosure<T4<Integer, String, String, Map<String, Integer>>>() {
- @Override public T4<Integer, String, String, Map<String, Integer>> read(PortableRawReaderEx reader) {
- int typeId = reader.readInt();
- String typeName = reader.readString();
- String affKey = reader.readString();
-
- Map<String, Integer> fields = PlatformUtils.readMap(reader,
- new PlatformReaderBiClosure<String, Integer>() {
- @Override public IgniteBiTuple<String, Integer> read(PortableRawReaderEx reader) {
- return F.t(reader.readString(), reader.readInt());
- }
- });
-
- return new T4<>(typeId, typeName, affKey, fields);
- }
- }
- );
-
- for (T4<Integer, String, String, Map<String, Integer>> meta : metas)
- cacheObjProc.updateMetaData(meta.get1(), meta.get2(), meta.get3(), meta.get4());
- }
-
- /** {@inheritDoc} */
- @Override public void writeMetadata(PortableRawWriterEx writer, int typeId) {
- writeMetadata0(writer, typeId, cacheObjProc.metadata(typeId));
- }
-
- /** {@inheritDoc} */
- @Override public void writeAllMetadata(PortableRawWriterEx writer) {
- Collection<PortableMetadata> metas = cacheObjProc.metadata();
-
- writer.writeInt(metas.size());
-
- for (org.apache.ignite.portable.PortableMetadata m : metas)
- writeMetadata0(writer, cacheObjProc.typeId(m.typeName()), m);
- }
-
- /**
- * Write portable metadata.
- *
- * @param writer Writer.
- * @param typeId Type id.
- * @param meta Metadata.
- */
- private void writeMetadata0(PortableRawWriterEx writer, int typeId, PortableMetadata meta) {
- if (meta == null)
- writer.writeBoolean(false);
- else {
- writer.writeBoolean(true);
-
- Map<String, String> metaFields = ((PortableMetaDataImpl)meta).fields0();
-
- Map<String, Integer> fields = U.newHashMap(metaFields.size());
-
- for (Map.Entry<String, String> metaField : metaFields.entrySet())
- fields.put(metaField.getKey(), CacheObjectPortableProcessorImpl.fieldTypeId(metaField.getValue()));
-
- writer.writeInt(typeId);
- writer.writeString(meta.typeName());
- writer.writeString(meta.affinityKeyFieldName());
- writer.writeMap(fields);
- }
- }
-
- /** {@inheritDoc} */
- @Override public PlatformContinuousQuery createContinuousQuery(long ptr, boolean hasFilter,
- @Nullable Object filter) {
- return new PlatformContinuousQueryImpl(this, ptr, hasFilter, filter);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformContinuousQueryFilter createContinuousQueryFilter(Object filter) {
- return new PlatformContinuousQueryRemoteFilter(filter);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformMessageFilter createRemoteMessageFilter(Object filter, long ptr) {
- return new PlatformMessageFilterImpl(filter, ptr, this);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isEventTypeSupported(int evtTyp) {
- return evtTyps.contains(evtTyp);
- }
-
- /** {@inheritDoc} */
- @Override public void writeEvent(PortableRawWriterEx writer, Event evt) {
- assert writer != null;
-
- if (evt == null)
- {
- writer.writeInt(-1);
-
- return;
- }
-
- EventAdapter evt0 = (EventAdapter)evt;
-
- if (evt0 instanceof CacheEvent) {
- writer.writeInt(2);
- writeCommonEventData(writer, evt0);
-
- CacheEvent event0 = (CacheEvent)evt0;
-
- writer.writeString(event0.cacheName());
- writer.writeInt(event0.partition());
- writer.writeBoolean(event0.isNear());
- writeNode(writer, event0.eventNode());
- writer.writeObject(event0.key());
- PlatformUtils.writeIgniteUuid(writer, event0.xid());
- writer.writeObject(event0.lockId());
- writer.writeObject(event0.newValue());
- writer.writeObject(event0.oldValue());
- writer.writeBoolean(event0.hasOldValue());
- writer.writeBoolean(event0.hasNewValue());
- writer.writeUuid(event0.subjectId());
- writer.writeString(event0.closureClassName());
- writer.writeString(event0.taskName());
- }
- else if (evt0 instanceof CacheQueryExecutedEvent) {
- writer.writeInt(3);
- writeCommonEventData(writer, evt0);
-
- CacheQueryExecutedEvent event0 = (CacheQueryExecutedEvent)evt0;
-
- writer.writeString(event0.queryType());
- writer.writeString(event0.cacheName());
- writer.writeString(event0.className());
- writer.writeString(event0.clause());
- writer.writeUuid(event0.subjectId());
- writer.writeString(event0.taskName());
- }
- else if (evt0 instanceof CacheQueryReadEvent) {
- writer.writeInt(4);
- writeCommonEventData(writer, evt0);
-
- CacheQueryReadEvent event0 = (CacheQueryReadEvent)evt0;
-
- writer.writeString(event0.queryType());
- writer.writeString(event0.cacheName());
- writer.writeString(event0.className());
- writer.writeString(event0.clause());
- writer.writeUuid(event0.subjectId());
- writer.writeString(event0.taskName());
- writer.writeObject(event0.key());
- writer.writeObject(event0.value());
- writer.writeObject(event0.oldValue());
- writer.writeObject(event0.row());
- }
- else if (evt0 instanceof CacheRebalancingEvent) {
- writer.writeInt(5);
- writeCommonEventData(writer, evt0);
-
- CacheRebalancingEvent event0 = (CacheRebalancingEvent)evt0;
-
- writer.writeString(event0.cacheName());
- writer.writeInt(event0.partition());
- writeNode(writer, event0.discoveryNode());
- writer.writeInt(event0.discoveryEventType());
- writer.writeString(event0.discoveryEventName());
- writer.writeLong(event0.discoveryTimestamp());
- }
- else if (evt0 instanceof CheckpointEvent) {
- writer.writeInt(6);
- writeCommonEventData(writer, evt0);
-
- CheckpointEvent event0 = (CheckpointEvent)evt0;
-
- writer.writeString(event0.key());
- }
- else if (evt0 instanceof DiscoveryEvent) {
- writer.writeInt(7);
- writeCommonEventData(writer, evt0);
-
- DiscoveryEvent event0 = (DiscoveryEvent)evt0;
-
- writeNode(writer, event0.eventNode());
- writer.writeLong(event0.topologyVersion());
-
- writeNodes(writer, event0.topologyNodes());
- }
- else if (evt0 instanceof JobEvent) {
- writer.writeInt(8);
- writeCommonEventData(writer, evt0);
-
- JobEvent event0 = (JobEvent)evt0;
-
- writer.writeString(event0.taskName());
- writer.writeString(event0.taskClassName());
- PlatformUtils.writeIgniteUuid(writer, event0.taskSessionId());
- PlatformUtils.writeIgniteUuid(writer, event0.jobId());
- writeNode(writer, event0.taskNode());
- writer.writeUuid(event0.taskSubjectId());
- }
- else if (evt0 instanceof SwapSpaceEvent) {
- writer.writeInt(9);
- writeCommonEventData(writer, evt0);
-
- SwapSpaceEvent event0 = (SwapSpaceEvent)evt0;
-
- writer.writeString(event0.space());
- }
- else if (evt0 instanceof TaskEvent) {
- writer.writeInt(10);
- writeCommonEventData(writer, evt0);
-
- TaskEvent event0 = (TaskEvent)evt0;
-
- writer.writeString(event0.taskName());
- writer.writeString(event0.taskClassName());
- PlatformUtils.writeIgniteUuid(writer, event0.taskSessionId());
- writer.writeBoolean(event0.internal());
- writer.writeUuid(event0.subjectId());
- }
- else
- throw new IgniteException("Unsupported event: " + evt);
- }
-
- /**
- * Write common event data.
- *
- * @param writer Writer.
- * @param evt Event.
- */
- private void writeCommonEventData(PortableRawWriterEx writer, EventAdapter evt) {
- PlatformUtils.writeIgniteUuid(writer, evt.id());
- writer.writeLong(evt.localOrder());
- writeNode(writer, evt.node());
- writer.writeString(evt.message());
- writer.writeInt(evt.type());
- writer.writeString(evt.name());
- writer.writeDate(new Date(evt.timestamp()));
- }
-
- /** {@inheritDoc} */
- @Override public PlatformEventFilterListener createLocalEventFilter(long hnd) {
- return new PlatformEventFilterListenerImpl(hnd, this);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformEventFilterListener createRemoteEventFilter(Object pred, int... types) {
- return new PlatformEventFilterListenerImpl(pred, types);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformNativeException createNativeException(Object cause) {
- return new PlatformNativeException(cause);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformJob createJob(Object task, long ptr, @Nullable Object job) {
- return new PlatformFullJob(this, (PlatformAbstractTask)task, ptr, job);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformJob createClosureJob(Object task, long ptr, Object job) {
- return new PlatformClosureJob((PlatformAbstractTask)task, ptr, job);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformCacheEntryProcessor createCacheEntryProcessor(Object proc, long ptr) {
- return new PlatformCacheEntryProcessorImpl(proc, ptr);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformCacheEntryFilter createCacheEntryFilter(Object filter, long ptr) {
- return new PlatformCacheEntryFilterImpl(filter, ptr, this);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformStreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable) {
- return new PlatformStreamReceiverImpl(rcv, ptr, keepPortable, this);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformClusterNodeFilter createClusterNodeFilter(Object filter) {
- return new PlatformClusterNodeFilterImpl(filter, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java
deleted file mode 100644
index e642b2d..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-import java.net.URL;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.HashMap;
-import java.util.ServiceLoader;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgnitionEx;
-import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Entry point for platform nodes.
- */
-@SuppressWarnings("UnusedDeclaration")
-public class PlatformIgnition {
- /** Map with active instances. */
- private static final HashMap<String, PlatformProcessor> instances = new HashMap<>();
-
- /**
- * Start Ignite node in platform mode.
- *
- * @param springCfgPath Spring configuration path.
- * @param gridName Grid name.
- * @param factoryId Factory ID.
- * @param envPtr Environment pointer.
- * @param dataPtr Optional pointer to additional data required for startup.
- * @return Ignite instance.
- */
- public static synchronized PlatformProcessor start(@Nullable String springCfgPath, @Nullable String gridName,
- int factoryId, long envPtr, long dataPtr) {
- if (envPtr <= 0)
- throw new IgniteException("Environment pointer must be positive.");
-
- ClassLoader oldClsLdr = Thread.currentThread().getContextClassLoader();
-
- Thread.currentThread().setContextClassLoader(PlatformProcessor.class.getClassLoader());
-
- try {
- IgniteConfiguration cfg = configuration(springCfgPath);
-
- if (gridName != null)
- cfg.setGridName(gridName);
- else
- gridName = cfg.getGridName();
-
- PlatformBootstrap bootstrap = bootstrap(factoryId);
-
- PlatformProcessor proc = bootstrap.start(cfg, envPtr, dataPtr);
-
- PlatformProcessor old = instances.put(gridName, proc);
-
- assert old == null;
-
- return proc;
- }
- finally {
- Thread.currentThread().setContextClassLoader(oldClsLdr);
- }
- }
-
- /**
- * Get instance by environment pointer.
- *
- * @param gridName Grid name.
- * @return Instance or {@code null} if it doesn't exist (never started or stopped).
- */
- @Nullable public static synchronized PlatformProcessor instance(@Nullable String gridName) {
- return instances.get(gridName);
- }
-
- /**
- * Get environment pointer of the given instance.
- *
- * @param gridName Grid name.
- * @return Environment pointer or {@code 0} in case grid with such name doesn't exist.
- */
- public static synchronized long environmentPointer(@Nullable String gridName) {
- PlatformProcessor proc = instance(gridName);
-
- return proc != null ? proc.environmentPointer() : 0;
- }
-
- /**
- * Stop single instance.
- *
- * @param gridName Grid name,
- * @param cancel Cancel flag.
- * @return {@code True} if instance was found and stopped.
- */
- public static synchronized boolean stop(@Nullable String gridName, boolean cancel) {
- if (Ignition.stop(gridName, cancel)) {
- PlatformProcessor old = instances.remove(gridName);
-
- assert old != null;
-
- return true;
- }
- else
- return false;
- }
-
- /**
- * Stop all instances.
- *
- * @param cancel Cancel flag.
- */
- public static synchronized void stopAll(boolean cancel) {
- for (PlatformProcessor proc : instances.values())
- Ignition.stop(proc.ignite().name(), cancel);
-
- instances.clear();
- }
-
- /**
- * Create configuration.
- *
- * @param springCfgPath Path to Spring XML.
- * @return Configuration.
- */
- private static IgniteConfiguration configuration(@Nullable String springCfgPath) {
- try {
- URL url = springCfgPath == null ? U.resolveIgniteUrl(IgnitionEx.DFLT_CFG) :
- U.resolveSpringUrl(springCfgPath);
-
- IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> t = IgnitionEx.loadConfiguration(url);
-
- return t.get1();
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to instantiate configuration from Spring XML: " + springCfgPath, e);
- }
- }
-
- /**
- * Create bootstrap for the given factory ID.
- *
- * @param factoryId Factory ID.
- * @return Bootstrap.
- */
- private static PlatformBootstrap bootstrap(final int factoryId) {
- PlatformBootstrapFactory factory = AccessController.doPrivileged(
- new PrivilegedAction<PlatformBootstrapFactory>() {
- @Override public PlatformBootstrapFactory run() {
- for (PlatformBootstrapFactory factory : ServiceLoader.load(PlatformBootstrapFactory.class)) {
- if (factory.id() == factoryId)
- return factory;
- }
-
- return null;
- }
- });
-
- if (factory == null)
- throw new IgniteException("Interop factory is not found (did you put into the classpath?): " + factoryId);
-
- return factory.create();
- }
-
- /**
- * Private constructor.
- */
- private PlatformIgnition() {
- // No-op.
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
deleted file mode 100644
index 40b1334..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.configuration.PlatformConfiguration;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteComputeImpl;
-import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
-import org.apache.ignite.internal.processors.platform.cache.PlatformCache;
-import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinity;
-import org.apache.ignite.internal.processors.platform.cache.store.PlatformCacheStore;
-import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterGroup;
-import org.apache.ignite.internal.processors.platform.compute.PlatformCompute;
-import org.apache.ignite.internal.processors.platform.datastreamer.PlatformDataStreamer;
-import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore;
-import org.apache.ignite.internal.processors.platform.events.PlatformEvents;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.messaging.PlatformMessaging;
-import org.apache.ignite.internal.processors.platform.services.PlatformServices;
-import org.apache.ignite.internal.processors.platform.transactions.PlatformTransactions;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * GridGain platform processor.
- */
-public class PlatformProcessorImpl extends GridProcessorAdapter implements PlatformProcessor {
- /** Start latch. */
- private final CountDownLatch startLatch = new CountDownLatch(1);
-
- /** Stores pending initialization. */
- private final Collection<StoreInfo> pendingStores =
- Collections.newSetFromMap(new ConcurrentHashMap<StoreInfo, Boolean>());
-
- /** Started stores. */
- private final Collection<PlatformCacheStore> stores =
- Collections.newSetFromMap(new ConcurrentHashMap<PlatformCacheStore, Boolean>());
-
- /** Lock for store lifecycle operations. */
- private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
-
- /** Logger. */
- private final IgniteLogger log;
-
- /** Context. */
- private final PlatformContext platformCtx;
-
- /** Interop configuration. */
- private final PlatformConfigurationEx interopCfg;
-
- /** Whether processor is started. */
- private boolean started;
-
- /** Whether processor if stopped (or stopping). */
- private boolean stopped;
-
- /**
- * Constructor.
- *
- * @param ctx Kernal context.
- */
- public PlatformProcessorImpl(GridKernalContext ctx) {
- super(ctx);
-
- log = ctx.log(PlatformProcessorImpl.class);
-
- PlatformConfiguration interopCfg0 = ctx.config().getPlatformConfiguration();
-
- assert interopCfg0 != null : "Must be checked earlier during component creation.";
-
- if (!(interopCfg0 instanceof PlatformConfigurationEx))
- throw new IgniteException("Unsupported platform configuration: " + interopCfg0.getClass().getName());
-
- interopCfg = (PlatformConfigurationEx)interopCfg0;
-
- if (!F.isEmpty(interopCfg.warnings())) {
- for (String w : interopCfg.warnings())
- U.warn(log, w);
- }
-
- platformCtx = new PlatformContextImpl(ctx, interopCfg.gate(), interopCfg.memory());
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- try (PlatformMemory mem = platformCtx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = platformCtx.writer(out);
-
- writer.writeString(ctx.gridName());
-
- out.synchronize();
-
- platformCtx.gateway().onStart(this, mem.pointer());
- }
-
- // At this moment all necessary native libraries must be loaded, so we can process with store creation.
- storeLock.writeLock().lock();
-
- try {
- for (StoreInfo store : pendingStores)
- registerStore0(store.store, store.convertPortable);
-
- pendingStores.clear();
-
- started = true;
- }
- finally {
- storeLock.writeLock().unlock();
- }
-
- // Add Interop node attributes.
- ctx.addNodeAttribute(PlatformUtils.ATTR_PLATFORM, interopCfg.platform());
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- startLatch.countDown();
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- if (platformCtx != null) {
- // Destroy cache stores.
- storeLock.writeLock().lock();
-
- try {
- for (PlatformCacheStore store : stores) {
- if (store != null) {
- if (store instanceof PlatformDotNetCacheStore) {
- PlatformDotNetCacheStore store0 = (PlatformDotNetCacheStore)store;
-
- try {
- store0.destroy(platformCtx.kernalContext());
- }
- catch (Exception e) {
- U.error(log, "Failed to destroy .Net cache store [store=" + store0 +
- ", err=" + e.getMessage() + ']');
- }
- }
- else
- assert false : "Invalid interop cache store type: " + store;
- }
- }
- }
- finally {
- stopped = true;
-
- storeLock.writeLock().unlock();
- }
-
- platformCtx.gateway().onStop();
- }
- }
-
- /** {@inheritDoc} */
- @Override public Ignite ignite() {
- return ctx.grid();
- }
-
- /** {@inheritDoc} */
- @Override public long environmentPointer() {
- return platformCtx.gateway().environmentPointer();
- }
-
- /** {@inheritDoc} */
- public void releaseStart() {
- startLatch.countDown();
- }
-
- /** {@inheritDoc} */
- public void awaitStart() throws IgniteCheckedException {
- U.await(startLatch);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformContext context() {
- return platformCtx;
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException {
- IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().cache(name);
-
- if (cache == null)
- throw new IllegalArgumentException("Cache doesn't exist: " + name);
-
- return new PlatformCache(platformCtx, cache.keepPortable(), false);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException {
- IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createCache(name);
-
- assert cache != null;
-
- return new PlatformCache(platformCtx, cache.keepPortable(), false);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException {
- IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateCache(name);
-
- assert cache != null;
-
- return new PlatformCache(platformCtx, cache.keepPortable(), false);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException {
- return new PlatformAffinity(platformCtx, ctx, name);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepPortable)
- throws IgniteCheckedException {
- IgniteDataStreamer ldr = ctx.dataStream().dataStreamer(cacheName);
-
- return new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepPortable);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget transactions() {
- return new PlatformTransactions(platformCtx);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget projection() throws IgniteCheckedException {
- return new PlatformClusterGroup(platformCtx, ctx.grid().cluster());
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget compute(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
-
- assert grp0.projection() instanceof ClusterGroupAdapter; // Safety for very complex ClusterGroup hierarchy.
-
- return new PlatformCompute(platformCtx, (IgniteComputeImpl)((ClusterGroupAdapter)grp0.projection()).compute());
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget message(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
-
- return new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection()));
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget events(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
-
- return new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection()));
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget services(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
-
- return new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false);
- }
-
- /** {@inheritDoc} */
- @Override public PlatformTarget extensions() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void registerStore(PlatformCacheStore store, boolean convertPortable)
- throws IgniteCheckedException {
- storeLock.readLock().lock();
-
- try {
- if (stopped)
- throw new IgniteCheckedException("Failed to initialize interop store becuase node is stopping: " +
- store);
-
- if (started)
- registerStore0(store, convertPortable);
- else
- pendingStores.add(new StoreInfo(store, convertPortable));
- }
- finally {
- storeLock.readLock().unlock();
- }
- }
-
- /**
- * Internal store initialization routine.
- *
- * @param store Store.
- * @param convertPortable Convert portable flag.
- * @throws IgniteCheckedException If failed.
- */
- private void registerStore0(PlatformCacheStore store, boolean convertPortable) throws IgniteCheckedException {
- if (store instanceof PlatformDotNetCacheStore) {
- PlatformDotNetCacheStore store0 = (PlatformDotNetCacheStore)store;
-
- store0.initialize(ctx, convertPortable);
- }
- else
- throw new IgniteCheckedException("Unsupported interop store: " + store);
- }
-
- /**
- * Store and manager pair.
- */
- private static class StoreInfo {
- /** Store. */
- private final PlatformCacheStore store;
-
- /** Convert portable flag. */
- private final boolean convertPortable;
-
- /**
- * Constructor.
- *
- * @param store Store.
- * @param convertPortable Convert portable flag.
- */
- private StoreInfo(PlatformCacheStore store, boolean convertPortable) {
- this.store = store;
- this.convertPortable = convertPortable;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
deleted file mode 100644
index ecdfc2c..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ /dev/null
@@ -1,1090 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache;
-
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheEntryProcessor;
-import org.apache.ignite.cache.CacheMetrics;
-import org.apache.ignite.cache.CachePartialUpdateException;
-import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.query.Query;
-import org.apache.ignite.cache.query.ScanQuery;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.cache.query.SqlQuery;
-import org.apache.ignite.cache.query.TextQuery;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.cache.CacheOperationContext;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
-import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor;
-import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor;
-import org.apache.ignite.internal.processors.platform.PlatformNativeException;
-import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.GridConcurrentFactory;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.lang.IgniteFuture;
-import org.jetbrains.annotations.Nullable;
-
-import javax.cache.Cache;
-import javax.cache.expiry.Duration;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-
-/**
- * Native cache wrapper implementation.
- */
-@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"})
-public class PlatformCache extends PlatformAbstractTarget {
- /** */
- public static final int OP_CLEAR = 1;
-
- /** */
- public static final int OP_CLEAR_ALL = 2;
-
- /** */
- public static final int OP_CONTAINS_KEY = 3;
-
- /** */
- public static final int OP_CONTAINS_KEYS = 4;
-
- /** */
- public static final int OP_GET = 5;
-
- /** */
- public static final int OP_GET_ALL = 6;
-
- /** */
- public static final int OP_GET_AND_PUT = 7;
-
- /** */
- public static final int OP_GET_AND_PUT_IF_ABSENT = 8;
-
- /** */
- public static final int OP_GET_AND_REMOVE = 9;
-
- /** */
- public static final int OP_GET_AND_REPLACE = 10;
-
- /** */
- public static final int OP_GET_NAME = 11;
-
- /** */
- public static final int OP_INVOKE = 12;
-
- /** */
- public static final int OP_INVOKE_ALL = 13;
-
- /** */
- public static final int OP_IS_LOCAL_LOCKED = 14;
-
- /** */
- public static final int OP_LOAD_CACHE = 15;
-
- /** */
- public static final int OP_LOC_EVICT = 16;
-
- /** */
- public static final int OP_LOC_LOAD_CACHE = 17;
-
- /** */
- public static final int OP_LOC_PROMOTE = 18;
-
- /** */
- public static final int OP_LOCAL_CLEAR = 20;
-
- /** */
- public static final int OP_LOCAL_CLEAR_ALL = 21;
-
- /** */
- public static final int OP_LOCK = 22;
-
- /** */
- public static final int OP_LOCK_ALL = 23;
-
- /** */
- public static final int OP_METRICS = 24;
-
- /** */
- private static final int OP_PEEK = 25;
-
- /** */
- private static final int OP_PUT = 26;
-
- /** */
- private static final int OP_PUT_ALL = 27;
-
- /** */
- public static final int OP_PUT_IF_ABSENT = 28;
-
- /** */
- public static final int OP_QRY_CONTINUOUS = 29;
-
- /** */
- public static final int OP_QRY_SCAN = 30;
-
- /** */
- public static final int OP_QRY_SQL = 31;
-
- /** */
- public static final int OP_QRY_SQL_FIELDS = 32;
-
- /** */
- public static final int OP_QRY_TXT = 33;
-
- /** */
- public static final int OP_REMOVE_ALL = 34;
-
- /** */
- public static final int OP_REMOVE_BOOL = 35;
-
- /** */
- public static final int OP_REMOVE_OBJ = 36;
-
- /** */
- public static final int OP_REPLACE_2 = 37;
-
- /** */
- public static final int OP_REPLACE_3 = 38;
-
- /** Underlying JCache. */
- private final IgniteCacheProxy cache;
-
- /** Whether this cache is created with "keepPortable" flag on the other side. */
- private final boolean keepPortable;
-
- /** */
- private static final GetAllWriter WRITER_GET_ALL = new GetAllWriter();
-
- /** */
- private static final EntryProcessorInvokeWriter WRITER_INVOKE = new EntryProcessorInvokeWriter();
-
- /** */
- private static final EntryProcessorInvokeAllWriter WRITER_INVOKE_ALL = new EntryProcessorInvokeAllWriter();
-
- /** Map with currently active locks. */
- private final ConcurrentMap<Long, Lock> lockMap = GridConcurrentFactory.newMap();
-
- /** Lock ID sequence. */
- private static final AtomicLong LOCK_ID_GEN = new AtomicLong();
-
- /**
- * Constructor.
- *
- * @param platformCtx Context.
- * @param cache Underlying cache.
- * @param keepPortable Keep portable flag.
- */
- public PlatformCache(PlatformContext platformCtx, IgniteCache cache, boolean keepPortable) {
- super(platformCtx);
-
- this.cache = (IgniteCacheProxy)cache;
- this.keepPortable = keepPortable;
- }
-
- /**
- * Gets cache with "skip-store" flag set.
- *
- * @return Cache with "skip-store" flag set.
- */
- public PlatformCache withSkipStore() {
- if (cache.delegate().skipStore())
- return this;
-
- return new PlatformCache(platformCtx, cache.withSkipStore(), keepPortable);
- }
-
- /**
- * Gets cache with "keep portable" flag.
- *
- * @return Cache with "keep portable" flag set.
- */
- public PlatformCache withKeepPortable() {
- if (keepPortable)
- return this;
-
- return new PlatformCache(platformCtx, cache.withSkipStore(), true);
- }
-
- /**
- * Gets cache with provided expiry policy.
- *
- * @param create Create.
- * @param update Update.
- * @param access Access.
- * @return Cache.
- */
- public PlatformCache withExpiryPolicy(final long create, final long update, final long access) {
- IgniteCache cache0 = cache.withExpiryPolicy(new InteropExpiryPolicy(create, update, access));
-
- return new PlatformCache(platformCtx, cache0, keepPortable);
- }
-
- /**
- * Gets cache with asynchronous mode enabled.
- *
- * @return Cache with asynchronous mode enabled.
- */
- public PlatformCache withAsync() {
- if (cache.isAsync())
- return this;
-
- return new PlatformCache(platformCtx, (IgniteCache)cache.withAsync(), keepPortable);
- }
-
- /**
- * Gets cache with no-retries mode enabled.
- *
- * @return Cache with no-retries mode enabled.
- */
- public PlatformCache withNoRetries() {
- CacheOperationContext opCtx = cache.operationContext();
-
- if (opCtx != null && opCtx.noRetries())
- return this;
-
- return new PlatformCache(platformCtx, cache.withNoRetries(), keepPortable);
- }
-
- /** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
- switch (type) {
- case OP_PUT:
- cache.put(reader.readObjectDetached(), reader.readObjectDetached());
-
- return TRUE;
-
- case OP_REMOVE_BOOL:
- return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
-
- case OP_REMOVE_ALL:
- cache.removeAll(PlatformUtils.readSet(reader));
-
- return TRUE;
-
- case OP_PUT_ALL:
- cache.putAll(PlatformUtils.readMap(reader));
-
- return TRUE;
-
- case OP_LOC_EVICT:
- cache.localEvict(PlatformUtils.readCollection(reader));
-
- return TRUE;
-
- case OP_CONTAINS_KEY:
- return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE;
-
- case OP_CONTAINS_KEYS:
- return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE;
-
- case OP_LOC_PROMOTE: {
- cache.localPromote(PlatformUtils.readSet(reader));
-
- break;
- }
-
- case OP_REPLACE_3:
- return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(),
- reader.readObjectDetached()) ? TRUE : FALSE;
-
- case OP_LOC_LOAD_CACHE:
- loadCache0(reader, true);
-
- break;
-
- case OP_LOAD_CACHE:
- loadCache0(reader, false);
-
- break;
-
- case OP_CLEAR:
- cache.clear(reader.readObjectDetached());
-
- break;
-
- case OP_CLEAR_ALL:
- cache.clearAll(PlatformUtils.readSet(reader));
-
- break;
-
- case OP_LOCAL_CLEAR:
- cache.localClear(reader.readObjectDetached());
-
- break;
-
- case OP_LOCAL_CLEAR_ALL:
- cache.localClearAll(PlatformUtils.readSet(reader));
-
- break;
-
- case OP_PUT_IF_ABSENT: {
- return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
- }
-
- case OP_REPLACE_2: {
- return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
- }
-
- case OP_REMOVE_OBJ: {
- return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE;
- }
-
- case OP_IS_LOCAL_LOCKED:
- return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE;
-
- default:
- return super.processInStreamOutLong(type, reader);
- }
-
- return TRUE;
- }
-
- /**
- * Loads cache via localLoadCache or loadCache.
- */
- private void loadCache0(PortableRawReaderEx reader, boolean loc) throws IgniteCheckedException {
- PlatformCacheEntryFilter filter = null;
-
- Object pred = reader.readObjectDetached();
-
- if (pred != null)
- filter = platformCtx.createCacheEntryFilter(pred, reader.readLong());
-
- Object[] args = reader.readObjectArray();
-
- if (loc)
- cache.localLoadCache(filter, args);
- else
- cache.loadCache(filter, args);
- }
-
- /** {@inheritDoc} */
- @Override protected Object processInStreamOutObject(int type, PortableRawReaderEx reader)
- throws IgniteCheckedException {
- switch (type) {
- case OP_QRY_SQL:
- return runQuery(reader, readSqlQuery(reader));
-
- case OP_QRY_SQL_FIELDS:
- return runFieldsQuery(reader, readFieldsQuery(reader));
-
- case OP_QRY_TXT:
- return runQuery(reader, readTextQuery(reader));
-
- case OP_QRY_SCAN:
- return runQuery(reader, readScanQuery(reader));
-
- case OP_QRY_CONTINUOUS: {
- long ptr = reader.readLong();
- boolean loc = reader.readBoolean();
- boolean hasFilter = reader.readBoolean();
- Object filter = reader.readObjectDetached();
- int bufSize = reader.readInt();
- long timeInterval = reader.readLong();
- boolean autoUnsubscribe = reader.readBoolean();
- Query initQry = readInitialQuery(reader);
-
- PlatformContinuousQuery qry = platformCtx.createContinuousQuery(ptr, hasFilter, filter);
-
- qry.start(cache, loc, bufSize, timeInterval, autoUnsubscribe, initQry);
-
- return qry;
- }
-
- default:
- return super.processInStreamOutObject(type, reader);
- }
- }
-
- /**
- * Read arguments for SQL query.
- *
- * @param reader Reader.
- * @return Arguments.
- */
- @Nullable private Object[] readQueryArgs(PortableRawReaderEx reader) {
- int cnt = reader.readInt();
-
- if (cnt > 0) {
- Object[] args = new Object[cnt];
-
- for (int i = 0; i < cnt; i++)
- args[i] = reader.readObjectDetached();
-
- return args;
- }
- else
- return null;
- }
-
- /** {@inheritDoc} */
- @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
- switch (type) {
- case OP_GET_NAME:
- writer.writeObject(cache.getName());
-
- break;
-
- case OP_METRICS:
- CacheMetrics metrics = cache.metrics();
-
- writer.writeLong(metrics.getCacheGets());
- writer.writeLong(metrics.getCachePuts());
- writer.writeLong(metrics.getCacheHits());
- writer.writeLong(metrics.getCacheMisses());
- writer.writeLong(metrics.getCacheTxCommits());
- writer.writeLong(metrics.getCacheTxRollbacks());
- writer.writeLong(metrics.getCacheEvictions());
- writer.writeLong(metrics.getCacheRemovals());
- writer.writeFloat(metrics.getAveragePutTime());
- writer.writeFloat(metrics.getAverageGetTime());
- writer.writeFloat(metrics.getAverageRemoveTime());
- writer.writeFloat(metrics.getAverageTxCommitTime());
- writer.writeFloat(metrics.getAverageTxRollbackTime());
- writer.writeString(metrics.name());
- writer.writeLong(metrics.getOverflowSize());
- writer.writeLong(metrics.getOffHeapEntriesCount());
- writer.writeLong(metrics.getOffHeapAllocatedSize());
- writer.writeInt(metrics.getSize());
- writer.writeInt(metrics.getKeySize());
- writer.writeBoolean(metrics.isEmpty());
- writer.writeInt(metrics.getDhtEvictQueueCurrentSize());
- writer.writeInt(metrics.getTxThreadMapSize());
- writer.writeInt(metrics.getTxXidMapSize());
- writer.writeInt(metrics.getTxCommitQueueSize());
- writer.writeInt(metrics.getTxPrepareQueueSize());
- writer.writeInt(metrics.getTxStartVersionCountsSize());
- writer.writeInt(metrics.getTxCommittedVersionsSize());
- writer.writeInt(metrics.getTxRolledbackVersionsSize());
- writer.writeInt(metrics.getTxDhtThreadMapSize());
- writer.writeInt(metrics.getTxDhtXidMapSize());
- writer.writeInt(metrics.getTxDhtCommitQueueSize());
- writer.writeInt(metrics.getTxDhtPrepareQueueSize());
- writer.writeInt(metrics.getTxDhtStartVersionCountsSize());
- writer.writeInt(metrics.getTxDhtCommittedVersionsSize());
- writer.writeInt(metrics.getTxDhtRolledbackVersionsSize());
- writer.writeBoolean(metrics.isWriteBehindEnabled());
- writer.writeInt(metrics.getWriteBehindFlushSize());
- writer.writeInt(metrics.getWriteBehindFlushThreadCount());
- writer.writeLong(metrics.getWriteBehindFlushFrequency());
- writer.writeInt(metrics.getWriteBehindStoreBatchSize());
- writer.writeInt(metrics.getWriteBehindTotalCriticalOverflowCount());
- writer.writeInt(metrics.getWriteBehindCriticalOverflowCount());
- writer.writeInt(metrics.getWriteBehindErrorRetryCount());
- writer.writeInt(metrics.getWriteBehindBufferSize());
- writer.writeString(metrics.getKeyType());
- writer.writeString(metrics.getValueType());
- writer.writeBoolean(metrics.isStoreByValue());
- writer.writeBoolean(metrics.isStatisticsEnabled());
- writer.writeBoolean(metrics.isManagementEnabled());
- writer.writeBoolean(metrics.isReadThrough());
- writer.writeBoolean(metrics.isWriteThrough());
- writer.writeFloat(metrics.getCacheHitPercentage());
- writer.writeFloat(metrics.getCacheMissPercentage());
-
- break;
-
- default:
- super.processOutStream(type, writer);
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"})
- @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer)
- throws IgniteCheckedException {
- switch (type) {
- case OP_GET: {
- writer.writeObjectDetached(cache.get(reader.readObjectDetached()));
-
- break;
- }
-
- case OP_GET_AND_PUT: {
- writer.writeObjectDetached(cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached()));
-
- break;
- }
-
- case OP_GET_AND_REPLACE: {
- writer.writeObjectDetached(cache.getAndReplace(reader.readObjectDetached(),
- reader.readObjectDetached()));
-
- break;
- }
-
- case OP_GET_AND_REMOVE: {
- writer.writeObjectDetached(cache.getAndRemove(reader.readObjectDetached()));
-
- break;
- }
-
- case OP_GET_AND_PUT_IF_ABSENT: {
- writer.writeObjectDetached(cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()));
-
- break;
- }
-
- case OP_PEEK: {
- Object key = reader.readObjectDetached();
-
- CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt());
-
- writer.writeObjectDetached(cache.localPeek(key, modes));
-
- break;
- }
-
- case OP_GET_ALL: {
- Set keys = PlatformUtils.readSet(reader);
-
- Map entries = cache.getAll(keys);
-
- PlatformUtils.writeNullableMap(writer, entries);
-
- break;
- }
-
- case OP_INVOKE: {
- Object key = reader.readObjectDetached();
-
- CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
-
- try {
- writer.writeObjectDetached(cache.invoke(key, proc));
- }
- catch (EntryProcessorException ex)
- {
- if (ex.getCause() instanceof PlatformNativeException)
- writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause());
- else
- throw ex;
- }
-
- break;
- }
-
- case OP_INVOKE_ALL: {
- Set<Object> keys = PlatformUtils.readSet(reader);
-
- CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
-
- writeInvokeAllResult(writer, cache.invokeAll(keys, proc));
-
- break;
- }
-
- case OP_LOCK:
- writer.writeLong(registerLock(cache.lock(reader.readObjectDetached())));
-
- break;
-
- case OP_LOCK_ALL:
- writer.writeLong(registerLock(cache.lockAll(PlatformUtils.readCollection(reader))));
-
- break;
-
- default:
- super.processInStreamOutStream(type, reader, writer);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Exception convertException(Exception e) {
- if (e instanceof CachePartialUpdateException)
- return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(),
- platformCtx, keepPortable);
-
- if (e instanceof CachePartialUpdateCheckedException)
- return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e, platformCtx, keepPortable);
-
- if (e.getCause() instanceof EntryProcessorException)
- return (EntryProcessorException) e.getCause();
-
- return super.convertException(e);
- }
-
- /**
- * Writes the result of InvokeAll cache method.
- *
- * @param writer Writer.
- * @param results Results.
- */
- private static void writeInvokeAllResult(PortableRawWriterEx writer, Map<Object, EntryProcessorResult> results) {
- if (results == null) {
- writer.writeInt(-1);
-
- return;
- }
-
- writer.writeInt(results.size());
-
- for (Map.Entry<Object, EntryProcessorResult> entry : results.entrySet()) {
- writer.writeObjectDetached(entry.getKey());
-
- EntryProcessorResult procRes = entry.getValue();
-
- try {
- Object res = procRes.get();
-
- writer.writeBoolean(false); // No exception
-
- writer.writeObjectDetached(res);
- }
- catch (Exception ex) {
- writer.writeBoolean(true); // Exception
-
- writeError(writer, ex);
- }
- }
- }
-
- /**
- * Writes an error to the writer either as a native exception, or as a couple of strings.
- * @param writer Writer.
- * @param ex Exception.
- */
- private static void writeError(PortableRawWriterEx writer, Exception ex) {
- if (ex.getCause() instanceof PlatformNativeException)
- writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause());
- else {
- writer.writeObjectDetached(ex.getClass().getName());
- writer.writeObjectDetached(ex.getMessage());
- }
- }
-
- /** <inheritDoc /> */
- @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
- return cache.future();
- }
-
- /** <inheritDoc /> */
- @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
- if (opId == OP_GET_ALL)
- return WRITER_GET_ALL;
-
- if (opId == OP_INVOKE)
- return WRITER_INVOKE;
-
- if (opId == OP_INVOKE_ALL)
- return WRITER_INVOKE_ALL;
-
- return null;
- }
-
- /**
- * Clears the contents of the cache, without notifying listeners or
- * {@ignitelink javax.cache.integration.CacheWriter}s.
- *
- * @throws IllegalStateException if the cache is closed.
- * @throws javax.cache.CacheException if there is a problem during the clear
- */
- public void clear() throws IgniteCheckedException {
- cache.clear();
- }
-
- /**
- * Removes all entries.
- *
- * @throws org.apache.ignite.IgniteCheckedException In case of error.
- */
- public void removeAll() throws IgniteCheckedException {
- cache.removeAll();
- }
-
- /**
- * Read cache size.
- *
- * @param peekModes Encoded peek modes.
- * @param loc Local mode flag.
- * @return Size.
- */
- public int size(int peekModes, boolean loc) {
- CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(peekModes);
-
- return loc ? cache.localSize(modes) : cache.size(modes);
- }
-
- /**
- * Create cache iterator.
- *
- * @return Cache iterator.
- */
- public PlatformCacheIterator iterator() {
- Iterator<Cache.Entry> iter = cache.iterator();
-
- return new PlatformCacheIterator(platformCtx, iter);
- }
-
- /**
- * Create cache iterator over local entries.
- *
- * @param peekModes Peke modes.
- * @return Cache iterator.
- */
- public PlatformCacheIterator localIterator(int peekModes) {
- CachePeekMode[] peekModes0 = PlatformUtils.decodeCachePeekModes(peekModes);
-
- Iterator<Cache.Entry> iter = cache.localEntries(peekModes0).iterator();
-
- return new PlatformCacheIterator(platformCtx, iter);
- }
-
- /**
- * Enters a lock.
- *
- * @param id Lock id.
- */
- public void enterLock(long id) throws InterruptedException {
- lock(id).lockInterruptibly();
- }
-
- /**
- * Exits a lock.
- *
- * @param id Lock id.
- */
- public void exitLock(long id) {
- lock(id).unlock();
- }
-
- /**
- * Attempts to enter a lock.
- *
- * @param id Lock id.
- * @param timeout Timeout, in milliseconds. -1 for infinite timeout.
- */
- public boolean tryEnterLock(long id, long timeout) throws InterruptedException {
- return timeout == -1
- ? lock(id).tryLock()
- : lock(id).tryLock(timeout, TimeUnit.MILLISECONDS);
- }
-
- /**
- * Rebalances the cache.
- *
- * @param futId Future id.
- */
- public void rebalance(long futId) {
- PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() {
- @Override public Object apply(IgniteFuture fut) {
- return null;
- }
- }), futId, PlatformFutureUtils.TYP_OBJ, this);
- }
-
- /**
- * Unregister lock.
- *
- * @param id Lock id.
- */
- public void closeLock(long id){
- Lock lock = lockMap.remove(id);
-
- assert lock != null : "Failed to unregister lock: " + id;
- }
-
- /**
- * Get lock by id.
- *
- * @param id Id.
- * @return Lock.
- */
- private Lock lock(long id) {
- Lock lock = lockMap.get(id);
-
- assert lock != null : "Lock not found for ID: " + id;
-
- return lock;
- }
-
- /**
- * Registers a lock in a map.
- *
- * @param lock Lock to register.
- * @return Registered lock id.
- */
- private long registerLock(Lock lock) {
- long id = LOCK_ID_GEN.incrementAndGet();
-
- lockMap.put(id, lock);
-
- return id;
- }
-
- /**
- * Runs specified query.
- */
- private PlatformQueryCursor runQuery(PortableRawReaderEx reader, Query qry) throws IgniteCheckedException {
-
- try {
- QueryCursorEx cursor = (QueryCursorEx) cache.query(qry);
-
- return new PlatformQueryCursor(platformCtx, cursor,
- qry.getPageSize() > 0 ? qry.getPageSize(): Query.DFLT_PAGE_SIZE);
- }
- catch (Exception err) {
- throw PlatformUtils.unwrapQueryException(err);
- }
- }
-
- /**
- * Runs specified fields query.
- */
- private PlatformFieldsQueryCursor runFieldsQuery(PortableRawReaderEx reader, Query qry)
- throws IgniteCheckedException {
- try {
- QueryCursorEx cursor = (QueryCursorEx) cache.query(qry);
-
- return new PlatformFieldsQueryCursor(platformCtx, cursor,
- qry.getPageSize() > 0 ? qry.getPageSize() : Query.DFLT_PAGE_SIZE);
- }
- catch (Exception err) {
- throw PlatformUtils.unwrapQueryException(err);
- }
- }
-
- /**
- * Reads the query of specified type.
- */
- private Query readInitialQuery(PortableRawReaderEx reader) throws IgniteCheckedException {
- int typ = reader.readInt();
-
- switch (typ) {
- case -1:
- return null;
-
- case OP_QRY_SCAN:
- return readScanQuery(reader);
-
- case OP_QRY_SQL:
- return readSqlQuery(reader);
-
- case OP_QRY_TXT:
- return readTextQuery(reader);
- }
-
- throw new IgniteCheckedException("Unsupported query type: " + typ);
- }
-
- /**
- * Reads sql query.
- */
- private Query readSqlQuery(PortableRawReaderEx reader) {
- boolean loc = reader.readBoolean();
- String sql = reader.readString();
- String typ = reader.readString();
- final int pageSize = reader.readInt();
-
- Object[] args = readQueryArgs(reader);
-
- return new SqlQuery(typ, sql).setPageSize(pageSize).setArgs(args).setLocal(loc);
- }
-
- /**
- * Reads fields query.
- */
- private Query readFieldsQuery(PortableRawReaderEx reader) {
- boolean loc = reader.readBoolean();
- String sql = reader.readString();
- final int pageSize = reader.readInt();
-
- Object[] args = readQueryArgs(reader);
-
- return new SqlFieldsQuery(sql).setPageSize(pageSize).setArgs(args).setLocal(loc);
- }
-
- /**
- * Reads text query.
- */
- private Query readTextQuery(PortableRawReaderEx reader) {
- boolean loc = reader.readBoolean();
- String txt = reader.readString();
- String typ = reader.readString();
- final int pageSize = reader.readInt();
-
- return new TextQuery(typ, txt).setPageSize(pageSize).setLocal(loc);
- }
-
- /**
- * Reads scan query.
- */
- private Query readScanQuery(PortableRawReaderEx reader) {
- boolean loc = reader.readBoolean();
- final int pageSize = reader.readInt();
-
- boolean hasPart = reader.readBoolean();
-
- Integer part = hasPart ? reader.readInt() : null;
-
- ScanQuery qry = new ScanQuery().setPageSize(pageSize);
-
- qry.setPartition(part);
-
- Object pred = reader.readObjectDetached();
-
- if (pred != null)
- qry.setFilter(platformCtx.createCacheEntryFilter(pred, reader.readLong()));
-
- qry.setLocal(loc);
-
- return qry;
- }
-
- /**
- * Writes error with EntryProcessorException cause.
- */
- private static class GetAllWriter implements PlatformFutureUtils.Writer {
- /** <inheritDoc /> */
- @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
- assert obj instanceof Map;
-
- PlatformUtils.writeNullableMap(writer, (Map) obj);
- }
-
- /** <inheritDoc /> */
- @Override public boolean canWrite(Object obj, Throwable err) {
- return err == null;
- }
- }
-
- /**
- * Writes error with EntryProcessorException cause.
- */
- private static class EntryProcessorInvokeWriter implements PlatformFutureUtils.Writer {
- /** <inheritDoc /> */
- @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
- if (err == null) {
- writer.writeBoolean(false); // No error.
-
- writer.writeObjectDetached(obj);
- }
- else {
- writer.writeBoolean(true); // Error.
-
- writeError(writer, (Exception) err);
- }
- }
-
- /** <inheritDoc /> */
- @Override public boolean canWrite(Object obj, Throwable err) {
- return true;
- }
- }
-
- /**
- * Writes results of InvokeAll method.
- */
- private static class EntryProcessorInvokeAllWriter implements PlatformFutureUtils.Writer {
- /** <inheritDoc /> */
- @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
- writeInvokeAllResult(writer, (Map)obj);
- }
-
- /** <inheritDoc /> */
- @Override public boolean canWrite(Object obj, Throwable err) {
- return obj != null && err == null;
- }
- }
-
- /**
- * Interop expiry policy.
- */
- private static class InteropExpiryPolicy implements ExpiryPolicy {
- /** Duration: unchanged. */
- private static final long DUR_UNCHANGED = -2;
-
- /** Duration: eternal. */
- private static final long DUR_ETERNAL = -1;
-
- /** Duration: zero. */
- private static final long DUR_ZERO = 0;
-
- /** Expiry for create. */
- private final Duration create;
-
- /** Expiry for update. */
- private final Duration update;
-
- /** Expiry for access. */
- private final Duration access;
-
- /**
- * Constructor.
- *
- * @param create Expiry for create.
- * @param update Expiry for update.
- * @param access Expiry for access.
- */
- public InteropExpiryPolicy(long create, long update, long access) {
- this.create = convert(create);
- this.update = convert(update);
- this.access = convert(access);
- }
-
- /** {@inheritDoc} */
- @Override public Duration getExpiryForCreation() {
- return create;
- }
-
- /** {@inheritDoc} */
- @Override public Duration getExpiryForUpdate() {
- return update;
- }
-
- /** {@inheritDoc} */
- @Override public Duration getExpiryForAccess() {
- return access;
- }
-
- /**
- * Convert encoded duration to actual duration.
- *
- * @param dur Encoded duration.
- * @return Actual duration.
- */
- private static Duration convert(long dur) {
- if (dur == DUR_UNCHANGED)
- return null;
- else if (dur == DUR_ETERNAL)
- return Duration.ETERNAL;
- else if (dur == DUR_ZERO)
- return Duration.ZERO;
- else {
- assert dur > 0;
-
- return new Duration(TimeUnit.MILLISECONDS, dur);
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
deleted file mode 100644
index 5f8ec8f..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.resources.IgniteInstanceResource;
-
-/**
- * Interop filter. Delegates apply to native platform.
- */
-public class PlatformCacheEntryFilterImpl extends PlatformAbstractPredicate implements PlatformCacheEntryFilter {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * {@link java.io.Externalizable} support.
- */
- public PlatformCacheEntryFilterImpl() {
- super();
- }
-
- /**
- * Constructor.
- *
- * @param pred .Net portable predicate.
- * @param ptr Pointer to predicate in the native platform.
- * @param ctx Kernal context.
- */
- public PlatformCacheEntryFilterImpl(Object pred, long ptr, PlatformContext ctx) {
- super(pred, ptr, ctx);
-
- assert pred != null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(Object k, Object v) {
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writer.writeObject(k);
- writer.writeObject(v);
-
- out.synchronize();
-
- return ctx.gateway().cacheEntryFilterApply(ptr, mem.pointer()) != 0;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onClose() {
- if (ptr == 0)
- return;
-
- assert ctx != null;
-
- ctx.gateway().cacheEntryFilterDestroy(ptr);
-
- ptr = 0;
- }
-
- /**
- * @param ignite Ignite instance.
- */
- @IgniteInstanceResource
- public void setIgniteInstance(Ignite ignite) {
- ctx = PlatformUtils.platformContext(ignite);
-
- if (ptr != 0)
- return;
-
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writer.writeObject(pred);
-
- out.synchronize();
-
- ptr = ctx.gateway().cacheEntryFilterCreate(mem.pointer());
- }
- }
-}
\ No newline at end of file