You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/05 14:16:05 UTC
[13/26] ignite git commit: Merged IGNITE-950-new into IGNITE-1282
http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 8446665..0afd6bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -49,7 +49,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
@@ -144,6 +143,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
@Override public V getAndPut(K key, V val, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
A.notNull(key, "key", val, "val");
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
return (V)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
@@ -153,7 +154,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
filter,
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ opCtx != null && opCtx.isKeepBinary());
}
/** {@inheritDoc} */
@@ -164,6 +166,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
long start = statsEnabled ? System.nanoTime() : 0L;
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
boolean res = (Boolean)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
@@ -173,7 +177,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
filter,
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ opCtx != null && opCtx.isKeepBinary());
if (statsEnabled)
metrics0().addPutTimeNanos(System.nanoTime() - start);
@@ -275,7 +280,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
true,
ctx.equalsValArray(oldVal),
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ ctx.operationContextPerCall().isKeepBinary());
}
/** {@inheritDoc} */
@@ -292,7 +298,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
true,
ctx.equalsValArray(val),
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ ctx.operationContextPerCall().isKeepBinary());
}
/** {@inheritDoc} */
@@ -322,6 +329,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
long start = statsEnabled ? System.nanoTime() : 0L;
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
updateAllInternal(UPDATE,
m.keySet(),
m.values(),
@@ -331,7 +340,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
CU.empty0(),
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ opCtx != null && opCtx.isKeepBinary());
if (statsEnabled)
metrics0().addPutTimeNanos(System.nanoTime() - start);
@@ -350,6 +360,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public V getAndRemove(K key) throws IgniteCheckedException {
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
return (V)updateAllInternal(DELETE,
Collections.singleton(key),
null,
@@ -359,7 +371,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
CU.empty0(),
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ opCtx != null && opCtx.isKeepBinary());
}
/** {@inheritDoc} */
@@ -371,6 +384,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void removeAll(Collection<? extends K> keys) throws IgniteCheckedException {
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
updateAllInternal(DELETE,
keys,
null,
@@ -380,7 +395,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
CU.empty0(),
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ opCtx != null && opCtx.isKeepBinary());
}
/** {@inheritDoc} */
@@ -397,6 +413,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
A.notNull(key, "key");
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
Boolean rmv = (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
@@ -406,7 +424,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
CU.empty0(),
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ opCtx != null && opCtx.isKeepBinary());
if (statsEnabled && rmv)
metrics0().addRemoveTimeNanos(System.nanoTime() - start);
@@ -426,6 +445,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
@Override public boolean remove(K key, V val) throws IgniteCheckedException {
A.notNull(key, "key", val, "val");
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
return (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
@@ -435,7 +456,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
ctx.equalsValArray(val),
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ opCtx != null && opCtx.isKeepBinary());
}
/** {@inheritDoc} */
@@ -575,7 +597,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
subjId,
null,
taskName,
- expiry);
+ expiry,
+ !deserializePortable);
if (v != null)
ctx.addResult(vals, cacheKey, v, skipVals, false, deserializePortable, true);
@@ -644,6 +667,10 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
}
});
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ final boolean keepPortable = opCtx != null && opCtx.isKeepBinary();
+
return (Map<K, EntryProcessorResult<T>>)updateAllInternal(TRANSFORM,
invokeMap.keySet(),
invokeMap.values(),
@@ -653,7 +680,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
null,
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ keepPortable);
}
/** {@inheritDoc} */
@@ -727,6 +755,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
if (keyCheck)
validateCacheKeys(map.keySet());
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
return (Map<K, EntryProcessorResult<T>>)updateAllInternal(TRANSFORM,
map.keySet(),
map.values(),
@@ -736,7 +766,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
null,
ctx.writeThrough(),
- ctx.readThrough());
+ ctx.readThrough(),
+ opCtx != null && opCtx.isKeepBinary());
}
/** {@inheritDoc} */
@@ -787,8 +818,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
final boolean readThrough = ctx.readThrough();
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
final ExpiryPolicy expiry = expiryPerCall();
+ final boolean keepPortable = opCtx != null && opCtx.isKeepBinary();
+
IgniteInternalFuture fut = asyncOp(new Callable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(op,
@@ -800,7 +835,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
rawRetval,
filter,
writeThrough,
- readThrough);
+ readThrough,
+ keepPortable);
}
});
@@ -835,6 +871,10 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
final ExpiryPolicy expiryPlc = expiryPerCall();
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ final boolean keepPortable = opCtx != null && opCtx.isKeepBinary();
+
IgniteInternalFuture fut = asyncOp(new Callable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(DELETE,
@@ -846,7 +886,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
rawRetval,
filter,
writeThrough,
- readThrough);
+ readThrough,
+ keepPortable);
}
});
@@ -882,7 +923,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
boolean rawRetval,
CacheEntryPredicate[] filter,
boolean writeThrough,
- boolean readThrough) throws IgniteCheckedException {
+ boolean readThrough,
+ boolean keepPortable
+ ) throws IgniteCheckedException {
if (keyCheck)
validateCacheKeys(keys);
@@ -905,6 +948,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
expiryPlc,
ver,
filter,
+ keepPortable,
subjId,
taskName);
}
@@ -945,6 +989,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
writeThrough,
readThrough,
retval,
+ keepPortable,
expiryPlc,
true,
true,
@@ -999,7 +1044,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
if (err != null)
throw err;
- Object ret = res == null ? null : rawRetval ? new GridCacheReturn(ctx, true, res.get2(), res.get1()) :
+ Object ret = res == null ? null : rawRetval ? new GridCacheReturn(ctx, true, keepPortable, res.get2(), res.get1()) :
(retval || op == TRANSFORM) ? res.get2() : res.get1();
if (op == TRANSFORM && ret == null)
@@ -1032,6 +1077,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
@Nullable ExpiryPolicy expiryPlc,
GridCacheVersion ver,
@Nullable CacheEntryPredicate[] filter,
+ boolean keepPortable,
UUID subjId,
String taskName
) throws IgniteCheckedException {
@@ -1099,12 +1145,13 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
subjId,
entryProcessor,
taskName,
- null);
+ null,
+ keepPortable);
Object oldVal = null;
CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(ctx, entry.key(), old,
- entry.version());
+ entry.version(), keepPortable);
CacheObject updated;
Object updatedVal = null;
@@ -1133,7 +1180,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
if (intercept) {
IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor()
.onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(),
- old, oldVal));
+ old, oldVal, keepPortable));
if (ctx.cancelRemove(interceptorRes))
continue;
@@ -1148,6 +1195,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
putMap,
null,
expiryPlc,
+ keepPortable,
err,
subjId,
taskName);
@@ -1168,7 +1216,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
if (intercept) {
Object interceptorVal = ctx.config().getInterceptor()
.onBeforePut(new CacheLazyEntry(ctx, entry.key(), invokeEntry.getKey(),
- old, oldVal), updatedVal);
+ old, oldVal, keepPortable), updatedVal);
if (interceptorVal == null)
continue;
@@ -1185,6 +1233,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
null,
rmvKeys,
expiryPlc,
+ keepPortable,
err,
subjId,
taskName);
@@ -1218,10 +1267,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
subjId,
null,
taskName,
- null);
+ null,
+ keepPortable);
Object interceptorVal = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(
- ctx, entry.key(), old), val);
+ ctx, entry.key(), old, keepPortable), val);
if (interceptorVal == null)
continue;
@@ -1252,10 +1302,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
subjId,
null,
taskName,
- null);
+ null,
+ keepPortable);
IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor()
- .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old));
+ .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, keepPortable));
if (ctx.cancelRemove(interceptorRes))
continue;
@@ -1289,6 +1340,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
putMap,
rmvKeys,
expiryPlc,
+ keepPortable,
err,
subjId,
taskName);
@@ -1326,6 +1378,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
@Nullable Map<Object, Object> putMap,
@Nullable Collection<Object> rmvKeys,
@Nullable ExpiryPolicy expiryPlc,
+ boolean keepPortable,
@Nullable CachePartialUpdateCheckedException err,
UUID subjId,
String taskName
@@ -1395,6 +1448,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
false,
false,
false,
+ keepPortable,
expiryPlc,
true,
true,
@@ -1405,9 +1459,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
if (intercept) {
if (op == UPDATE)
- ctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(ctx, entry.key(), writeVal));
+ ctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(ctx, entry.key(), writeVal, keepPortable));
else
- ctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(ctx, entry.key(), t.get2()));
+ ctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(ctx, entry.key(), t.get2(), keepPortable));
}
}
catch (GridCacheEntryRemovedException ignore) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java
index 23edd9e..1a2ef7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java
@@ -21,7 +21,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.portable.PortableObject;
+import org.apache.ignite.binary.BinaryObject;
/**
*
@@ -34,7 +34,7 @@ public class CacheDefaultPortableAffinityKeyMapper extends GridCacheDefaultAffin
@Override public Object affinityKey(Object key) {
IgniteKernal kernal = (IgniteKernal)ignite;
- CacheObjectPortableProcessorImpl proc = (CacheObjectPortableProcessorImpl)kernal.context().cacheObjects();
+ CacheObjectBinaryProcessorImpl proc = (CacheObjectBinaryProcessorImpl)kernal.context().cacheObjects();
try {
key = proc.toPortable(key);
@@ -43,8 +43,8 @@ public class CacheDefaultPortableAffinityKeyMapper extends GridCacheDefaultAffin
U.error(log, "Failed to marshal key to portable: " + key, e);
}
- if (key instanceof PortableObject)
- return proc.affinityKey((PortableObject)key);
+ if (key instanceof BinaryObject)
+ return proc.affinityKey((BinaryObject)key);
else
return super.affinityKey(key);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessor.java
new file mode 100644
index 0000000..7178a94
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.binary.BinaryObject;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extended cache object processor interface with additional methods for binary.
+ */
+public interface CacheObjectBinaryProcessor extends IgniteCacheObjectProcessor {
+ /**
+ * @param typeId Type ID.
+ * @return Builder.
+ */
+ public BinaryObjectBuilder builder(int typeId);
+
+ /**
+ * @param clsName Class name.
+ * @return Builder.
+ */
+ public BinaryObjectBuilder builder(String clsName);
+
+ /**
+ * Creates builder initialized by existing portable object.
+ *
+ * @param portableObj Portable object to edit.
+ * @return Portable builder.
+ */
+ public BinaryObjectBuilder builder(BinaryObject portableObj);
+
+ /**
+ * @param typeId Type ID.
+ * @param newMeta New meta data.
+ * @throws IgniteException In case of error.
+ */
+ public void addMeta(int typeId, final BinaryType newMeta) throws IgniteException;
+
+ /**
+ * @param typeId Type ID.
+ * @param typeName Type name.
+ * @param affKeyFieldName Affinity key field name.
+ * @param fieldTypeIds Fields map.
+ * @throws IgniteException In case of error.
+ */
+ public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName,
+ Map<String, Integer> fieldTypeIds) throws IgniteException;
+
+ /**
+ * @param typeId Type ID.
+ * @return Meta data.
+ * @throws IgniteException In case of error.
+ */
+ @Nullable public BinaryType metadata(int typeId) throws IgniteException;
+
+ /**
+ * @param typeIds Type ID.
+ * @return Meta data.
+ * @throws IgniteException In case of error.
+ */
+ public Map<Integer, BinaryType> metadata(Collection<Integer> typeIds) throws IgniteException;
+
+ /**
+ * @return Metadata for all types.
+ * @throws IgniteException In case of error.
+ */
+ public Collection<BinaryType> metadata() throws IgniteException;
+
+ /**
+ * @return Portables interface.
+ * @throws IgniteException If failed.
+ */
+ public IgniteBinary binary() throws IgniteException;
+
+ /**
+ * @param obj Original object.
+ * @return Portable object (in case portable marshaller is used).
+ * @throws IgniteException If failed.
+ */
+ public Object marshalToPortable(Object obj) throws IgniteException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
new file mode 100644
index 0000000..df8e7f9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
@@ -0,0 +1,1046 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.portable.GridPortableMarshaller;
+import org.apache.ignite.internal.portable.PortableContext;
+import org.apache.ignite.internal.portable.PortableMetaDataHandler;
+import org.apache.ignite.internal.portable.BinaryMetaDataImpl;
+import org.apache.ignite.internal.portable.BinaryObjectImpl;
+import org.apache.ignite.internal.portable.BinaryObjectOffheapImpl;
+import org.apache.ignite.internal.portable.PortableUtils;
+import org.apache.ignite.internal.portable.builder.BinaryObjectBuilderImpl;
+import org.apache.ignite.internal.portable.streams.PortableInputStream;
+import org.apache.ignite.internal.portable.streams.PortableOffheapInputStream;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.query.CacheQuery;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.lang.GridMapEntry;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.portable.PortableMarshaller;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.binary.BinaryObject;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+import sun.misc.Unsafe;
+
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.BOOLEAN;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.BOOLEAN_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.BYTE;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.BYTE_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.CHAR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.CHAR_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.CLASS;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.COL;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.DATE;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.DATE_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.DECIMAL;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.DECIMAL_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.ENUM;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.ENUM_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.INT;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.INT_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.LONG;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.LONG_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.MAP;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.MAP_ENTRY;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.OBJ;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.OBJ_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.PORTABLE_OBJ;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.TIMESTAMP;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.TIMESTAMP_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID_ARR;
+
+/**
+ * Portable processor implementation.
+ */
+public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorImpl implements
+ CacheObjectBinaryProcessor {
+ /** */
+ public static final String[] FIELD_TYPE_NAMES;
+
+ /** */
+ private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** */
+ private final CountDownLatch startLatch = new CountDownLatch(1);
+
+ /** */
+ private final boolean clientNode;
+
+ /** */
+ private volatile IgniteCacheProxy<PortableMetaDataKey, BinaryType> metaDataCache;
+
+ /** */
+ private final ConcurrentHashMap8<PortableMetaDataKey, BinaryType> clientMetaDataCache;
+
+ /** Predicate to filter portable meta data in utility cache. */
+ private final CacheEntryPredicate metaPred = new CacheEntryPredicateAdapter() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public boolean apply(GridCacheEntryEx e) {
+ return e.key().value(e.context().cacheObjectContext(), false) instanceof PortableMetaDataKey;
+ }
+ };
+
+ /** */
+ private PortableContext portableCtx;
+
+ /** */
+ private Marshaller marsh;
+
+ /** */
+ private GridPortableMarshaller portableMarsh;
+
+ /** */
+ @GridToStringExclude
+ private IgniteBinary portables;
+
+ /** Metadata updates collected before metadata cache is initialized. */
+ private final Map<Integer, BinaryType> metaBuf = new ConcurrentHashMap<>();
+
+ /** */
+ private UUID metaCacheQryId;
+
+ /**
+ *
+ */
+ static {
+ FIELD_TYPE_NAMES = new String[104];
+
+ FIELD_TYPE_NAMES[BYTE] = "byte";
+ FIELD_TYPE_NAMES[SHORT] = "short";
+ FIELD_TYPE_NAMES[INT] = "int";
+ FIELD_TYPE_NAMES[LONG] = "long";
+ FIELD_TYPE_NAMES[BOOLEAN] = "boolean";
+ FIELD_TYPE_NAMES[FLOAT] = "float";
+ FIELD_TYPE_NAMES[DOUBLE] = "double";
+ FIELD_TYPE_NAMES[CHAR] = "char";
+ FIELD_TYPE_NAMES[UUID] = "UUID";
+ FIELD_TYPE_NAMES[DECIMAL] = "decimal";
+ FIELD_TYPE_NAMES[STRING] = "String";
+ FIELD_TYPE_NAMES[DATE] = "Date";
+ FIELD_TYPE_NAMES[TIMESTAMP] = "Timestamp";
+ FIELD_TYPE_NAMES[ENUM] = "Enum";
+ FIELD_TYPE_NAMES[OBJ] = "Object";
+ FIELD_TYPE_NAMES[PORTABLE_OBJ] = "Object";
+ FIELD_TYPE_NAMES[COL] = "Collection";
+ FIELD_TYPE_NAMES[MAP] = "Map";
+ FIELD_TYPE_NAMES[MAP_ENTRY] = "Entry";
+ FIELD_TYPE_NAMES[CLASS] = "Class";
+ FIELD_TYPE_NAMES[BYTE_ARR] = "byte[]";
+ FIELD_TYPE_NAMES[SHORT_ARR] = "short[]";
+ FIELD_TYPE_NAMES[INT_ARR] = "int[]";
+ FIELD_TYPE_NAMES[LONG_ARR] = "long[]";
+ FIELD_TYPE_NAMES[BOOLEAN_ARR] = "boolean[]";
+ FIELD_TYPE_NAMES[FLOAT_ARR] = "float[]";
+ FIELD_TYPE_NAMES[DOUBLE_ARR] = "double[]";
+ FIELD_TYPE_NAMES[CHAR_ARR] = "char[]";
+ FIELD_TYPE_NAMES[UUID_ARR] = "UUID[]";
+ FIELD_TYPE_NAMES[DECIMAL_ARR] = "decimal[]";
+ FIELD_TYPE_NAMES[STRING_ARR] = "String[]";
+ FIELD_TYPE_NAMES[DATE_ARR] = "Date[]";
+ FIELD_TYPE_NAMES[TIMESTAMP_ARR] = "Timestamp[]";
+ FIELD_TYPE_NAMES[OBJ_ARR] = "Object[]";
+ FIELD_TYPE_NAMES[ENUM_ARR] = "Enum[]";
+ }
+
+ /**
+ * @param typeName Field type name.
+ * @return Field type ID;
+ */
+ @SuppressWarnings("StringEquality")
+ public static int fieldTypeId(String typeName) {
+ for (int i = 0; i < FIELD_TYPE_NAMES.length; i++) {
+ String typeName0 = FIELD_TYPE_NAMES[i];
+
+ if (typeName.equals(typeName0))
+ return i;
+ }
+
+ throw new IllegalArgumentException("Invalid metadata type name: " + typeName);
+ }
+
+ /**
+ * @param typeId Field type ID.
+ * @return Field type name.
+ */
+ public static String fieldTypeName(int typeId) {
+ assert typeId >= 0 && typeId < FIELD_TYPE_NAMES.length : typeId;
+
+ String typeName = FIELD_TYPE_NAMES[typeId];
+
+ assert typeName != null : typeId;
+
+ return typeName;
+ }
+
+ /**
+ * @param typeIds Field type IDs.
+ * @return Field type names.
+ */
+ public static Map<String, String> fieldTypeNames(Map<String, Integer> typeIds) {
+ Map<String, String> names = U.newHashMap(typeIds.size());
+
+ for (Map.Entry<String, Integer> e : typeIds.entrySet())
+ names.put(e.getKey(), fieldTypeName(e.getValue()));
+
+ return names;
+ }
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public CacheObjectBinaryProcessorImpl(GridKernalContext ctx) {
+ super(ctx);
+
+ marsh = ctx.grid().configuration().getMarshaller();
+
+ clientNode = this.ctx.clientNode();
+
+ clientMetaDataCache = clientNode ? new ConcurrentHashMap8<PortableMetaDataKey, BinaryType>() : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (marsh instanceof PortableMarshaller) {
+ PortableMetaDataHandler metaHnd = new PortableMetaDataHandler() {
+ @Override public void addMeta(int typeId, BinaryType newMeta)
+ throws BinaryObjectException {
+ if (metaDataCache == null) {
+ BinaryType oldMeta = metaBuf.get(typeId);
+
+ if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) {
+ synchronized (this) {
+ Map<String, String> fields = new HashMap<>();
+
+ if (checkMeta(typeId, oldMeta, newMeta, fields)) {
+ newMeta = new BinaryMetaDataImpl(newMeta.typeName(),
+ fields,
+ newMeta.affinityKeyFieldName());
+
+ metaBuf.put(typeId, newMeta);
+ }
+ else
+ return;
+ }
+
+ if (metaDataCache == null)
+ return;
+ else
+ metaBuf.remove(typeId);
+ }
+ else
+ return;
+ }
+
+ CacheObjectBinaryProcessorImpl.this.addMeta(typeId, newMeta);
+ }
+
+ @Override public BinaryType metadata(int typeId) throws BinaryObjectException {
+ if (metaDataCache == null)
+ U.awaitQuiet(startLatch);
+
+ return CacheObjectBinaryProcessorImpl.this.metadata(typeId);
+ }
+ };
+
+ PortableMarshaller pMarh0 = (PortableMarshaller)marsh;
+
+ portableCtx = new PortableContext(metaHnd, ctx.config());
+
+ IgniteUtils.invoke(PortableMarshaller.class, pMarh0, "setPortableContext", portableCtx);
+
+ portableMarsh = new GridPortableMarshaller(portableCtx);
+
+ portables = new IgniteBinaryImpl(ctx, this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
+ metaDataCache = ctx.cache().jcache(CU.UTILITY_CACHE_NAME);
+
+ if (clientNode) {
+ assert !metaDataCache.context().affinityNode();
+
+ metaCacheQryId = metaDataCache.context().continuousQueries().executeInternalQuery(
+ new MetaDataEntryListener(),
+ new MetaDataEntryFilter(),
+ false,
+ true);
+
+ while (true) {
+ ClusterNode oldestSrvNode =
+ CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
+
+ if (oldestSrvNode == null)
+ break;
+
+ GridCacheQueryManager qryMgr = metaDataCache.context().queries();
+
+ CacheQuery<Map.Entry<PortableMetaDataKey, BinaryType>> qry =
+ qryMgr.createScanQuery(new MetaDataPredicate(), null, false);
+
+ qry.keepAll(false);
+
+ qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
+
+ try {
+ CacheQueryFuture<Map.Entry<PortableMetaDataKey, BinaryType>> fut = qry.execute();
+
+ Map.Entry<PortableMetaDataKey, BinaryType> next;
+
+ while ((next = fut.next()) != null) {
+ assert next.getKey() != null : next;
+ assert next.getValue() != null : next;
+
+ addClientCacheMetaData(next.getKey(), next.getValue());
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (!ctx.discovery().alive(oldestSrvNode) || !ctx.discovery().pingNode(oldestSrvNode.id()))
+ continue;
+ else
+ throw e;
+ }
+ catch (CacheException e) {
+ if (X.hasCause(e, ClusterTopologyCheckedException.class, ClusterTopologyException.class))
+ continue;
+ else
+ throw e;
+ }
+
+ break;
+ }
+ }
+
+ startLatch.countDown();
+
+ for (Map.Entry<Integer, BinaryType> e : metaBuf.entrySet())
+ addMeta(e.getKey(), e.getValue());
+
+ metaBuf.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ super.onKernalStop(cancel);
+
+ if (metaCacheQryId != null)
+ metaDataCache.context().continuousQueries().cancelInternalQuery(metaCacheQryId);
+ }
+
+ /**
+ * @param key Metadata key.
+ * @param newMeta Metadata.
+ */
+ private void addClientCacheMetaData(PortableMetaDataKey key, final BinaryType newMeta) {
+ clientMetaDataCache.compute(key,
+ new ConcurrentHashMap8.BiFun<PortableMetaDataKey, BinaryType, BinaryType>() {
+ @Override public BinaryType apply(PortableMetaDataKey key, BinaryType oldMeta) {
+ BinaryType res;
+
+ try {
+ res = checkMeta(key.typeId(), oldMeta, newMeta, null) ? newMeta : oldMeta;
+ }
+ catch (BinaryObjectException e) {
+ res = oldMeta;
+ }
+
+ return res;
+ }
+ }
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public int typeId(String typeName) {
+ if (portableCtx == null)
+ return super.typeId(typeName);
+
+ return portableCtx.typeId(typeName);
+ }
+
+ /**
+ * @param obj Object.
+ * @return Bytes.
+ * @throws org.apache.ignite.binary.BinaryObjectException If failed.
+ */
+ public byte[] marshal(@Nullable Object obj) throws BinaryObjectException {
+ byte[] arr = portableMarsh.marshal(obj);
+
+ assert arr.length > 0;
+
+ return arr;
+ }
+
+ /**
+ * @param ptr Off-heap pointer.
+ * @param forceHeap If {@code true} creates heap-based object.
+ * @return Object.
+ * @throws org.apache.ignite.binary.BinaryObjectException If failed.
+ */
+ public Object unmarshal(long ptr, boolean forceHeap) throws BinaryObjectException {
+ assert ptr > 0 : ptr;
+
+ int size = UNSAFE.getInt(ptr);
+
+ ptr += 4;
+
+ byte type = UNSAFE.getByte(ptr++);
+
+ if (type != CacheObject.TYPE_BYTE_ARR) {
+ assert size > 0 : size;
+
+ PortableInputStream in = new PortableOffheapInputStream(ptr, size, forceHeap);
+
+ return portableMarsh.unmarshal(in);
+ }
+ else
+ return U.copyMemory(ptr, size);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object marshalToPortable(@Nullable Object obj) throws BinaryObjectException {
+ if (obj == null)
+ return null;
+
+ if (PortableUtils.isPortableType(obj.getClass()))
+ return obj;
+
+ if (obj instanceof Object[]) {
+ Object[] arr = (Object[])obj;
+
+ Object[] pArr = new Object[arr.length];
+
+ for (int i = 0; i < arr.length; i++)
+ pArr[i] = marshalToPortable(arr[i]);
+
+ return pArr;
+ }
+
+ if (obj instanceof Collection) {
+ Collection<Object> col = (Collection<Object>)obj;
+
+ Collection<Object> pCol;
+
+ if (col instanceof Set)
+ pCol = (Collection<Object>)PortableUtils.newSet((Set<?>)col);
+ else
+ pCol = new ArrayList<>(col.size());
+
+ for (Object item : col)
+ pCol.add(marshalToPortable(item));
+
+ return pCol;
+ }
+
+ if (obj instanceof Map) {
+ Map<?, ?> map = (Map<?, ?>)obj;
+
+ Map<Object, Object> pMap = PortableUtils.newMap((Map<Object, Object>)obj);
+
+ for (Map.Entry<?, ?> e : map.entrySet())
+ pMap.put(marshalToPortable(e.getKey()), marshalToPortable(e.getValue()));
+
+ return pMap;
+ }
+
+ if (obj instanceof Map.Entry) {
+ Map.Entry<?, ?> e = (Map.Entry<?, ?>)obj;
+
+ return new GridMapEntry<>(marshalToPortable(e.getKey()), marshalToPortable(e.getValue()));
+ }
+
+ byte[] arr = portableMarsh.marshal(obj);
+
+ assert arr.length > 0;
+
+ Object obj0 = portableMarsh.unmarshal(arr, null);
+
+ assert obj0 instanceof BinaryObject;
+
+ ((BinaryObjectImpl)obj0).detachAllowed(true);
+
+ return obj0;
+ }
+
+ /**
+ * @return Marshaller.
+ */
+ public GridPortableMarshaller marshaller() {
+ return portableMarsh;
+ }
+
+ /** {@inheritDoc} */
+ @Override public BinaryObjectBuilder builder(int typeId) {
+ return new BinaryObjectBuilderImpl(portableCtx, typeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public BinaryObjectBuilder builder(String clsName) {
+ return new BinaryObjectBuilderImpl(portableCtx, clsName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public BinaryObjectBuilder builder(BinaryObject portableObj) {
+ return BinaryObjectBuilderImpl.wrap(portableObj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName,
+ Map<String, Integer> fieldTypeIds) throws BinaryObjectException {
+ portableCtx.updateMetaData(typeId,
+ new BinaryMetaDataImpl(typeName, fieldTypeNames(fieldTypeIds), affKeyFieldName));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addMeta(final int typeId, final BinaryType newMeta) throws BinaryObjectException {
+ assert newMeta != null;
+
+ final PortableMetaDataKey key = new PortableMetaDataKey(typeId);
+
+ try {
+ BinaryType oldMeta = metaDataCache.localPeek(key);
+
+ if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) {
+ BinaryObjectException err = metaDataCache.invoke(key, new MetaDataProcessor(typeId, newMeta));
+
+ if (err != null)
+ throw err;
+ }
+ }
+ catch (CacheException e) {
+ throw new BinaryObjectException("Failed to update meta data for type: " + newMeta.typeName(), e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public BinaryType metadata(final int typeId) throws BinaryObjectException {
+ try {
+ if (clientNode)
+ return clientMetaDataCache.get(new PortableMetaDataKey(typeId));
+
+ return metaDataCache.localPeek(new PortableMetaDataKey(typeId));
+ }
+ catch (CacheException e) {
+ throw new BinaryObjectException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Integer, BinaryType> metadata(Collection<Integer> typeIds)
+ throws BinaryObjectException {
+ try {
+ Collection<PortableMetaDataKey> keys = new ArrayList<>(typeIds.size());
+
+ for (Integer typeId : typeIds)
+ keys.add(new PortableMetaDataKey(typeId));
+
+ Map<PortableMetaDataKey, BinaryType> meta = metaDataCache.getAll(keys);
+
+ Map<Integer, BinaryType> res = U.newHashMap(meta.size());
+
+ for (Map.Entry<PortableMetaDataKey, BinaryType> e : meta.entrySet())
+ res.put(e.getKey().typeId(), e.getValue());
+
+ return res;
+ }
+ catch (CacheException e) {
+ throw new BinaryObjectException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Collection<BinaryType> metadata() throws BinaryObjectException {
+ if (clientNode)
+ return new ArrayList<>(clientMetaDataCache.values());
+
+ return F.viewReadOnly(metaDataCache.entrySetx(metaPred),
+ new C1<Cache.Entry<PortableMetaDataKey, BinaryType>, BinaryType>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public BinaryType apply(
+ Cache.Entry<PortableMetaDataKey, BinaryType> e) {
+ return e.getValue();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteBinary binary() throws IgniteException {
+ return portables;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isPortableObject(Object obj) {
+ return obj instanceof BinaryObject;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isPortableEnabled(CacheConfiguration<?, ?> ccfg) {
+ return marsh instanceof PortableMarshaller;
+ }
+
+ /**
+ * @param po Portable object.
+ * @return Affinity key.
+ */
+ public Object affinityKey(BinaryObject po) {
+ try {
+ BinaryType meta = po.type();
+
+ if (meta != null) {
+ String affKeyFieldName = meta.affinityKeyFieldName();
+
+ if (affKeyFieldName != null)
+ return po.field(affKeyFieldName);
+ }
+ }
+ catch (BinaryObjectException e) {
+ U.error(log, "Failed to get affinity field from portable object: " + po, e);
+ }
+
+ return po;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int typeId(Object obj) {
+ if (obj == null)
+ return 0;
+
+ return isPortableObject(obj) ? ((BinaryObject)obj).typeId() : typeId(obj.getClass().getSimpleName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object field(Object obj, String fieldName) {
+ if (obj == null)
+ return null;
+
+ return isPortableObject(obj) ? ((BinaryObject)obj).field(fieldName) : super.field(obj, fieldName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasField(Object obj, String fieldName) {
+ return obj != null && ((BinaryObject)obj).hasField(fieldName);
+ }
+
+ /**
+ * @return Portable context.
+ */
+ public PortableContext portableContext() {
+ return portableCtx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObjectContext contextForCache(CacheConfiguration cfg) throws IgniteCheckedException {
+ assert cfg != null;
+
+ boolean portableEnabled = marsh instanceof PortableMarshaller && !GridCacheUtils.isSystemCache(cfg.getName()) &&
+ !GridCacheUtils.isIgfsCache(ctx.config(), cfg.getName());
+
+ CacheObjectContext ctx0 = super.contextForCache(cfg);
+
+ CacheObjectContext res = new CacheObjectPortableContext(ctx,
+ ctx0.copyOnGet(),
+ ctx0.storeValue(),
+ portableEnabled,
+ ctx0.addDeploymentInfo());
+
+ ctx.resource().injectGeneric(res.defaultAffMapper());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException {
+ if (!((CacheObjectPortableContext)ctx).portableEnabled() || portableMarsh == null)
+ return super.marshal(ctx, val);
+
+ byte[] arr = portableMarsh.marshal(val);
+
+ assert arr.length > 0;
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr)
+ throws IgniteCheckedException {
+ if (!((CacheObjectPortableContext)ctx).portableEnabled() || portableMarsh == null)
+ return super.unmarshal(ctx, bytes, clsLdr);
+
+ return portableMarsh.unmarshal(bytes, clsLdr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) {
+ if (!((CacheObjectPortableContext)ctx).portableEnabled())
+ return super.toCacheKeyObject(ctx, obj, userObj);
+
+ if (obj instanceof KeyCacheObject)
+ return (KeyCacheObject)obj;
+
+ if (((CacheObjectPortableContext)ctx).portableEnabled()) {
+ obj = toPortable(obj);
+
+ if (obj instanceof BinaryObject)
+ return (BinaryObjectImpl)obj;
+ }
+
+ return toCacheKeyObject0(obj, userObj);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj,
+ boolean userObj) {
+ if (!((CacheObjectPortableContext)ctx).portableEnabled())
+ return super.toCacheObject(ctx, obj, userObj);
+
+ if (obj == null || obj instanceof CacheObject)
+ return (CacheObject)obj;
+
+ obj = toPortable(obj);
+
+ if (obj instanceof BinaryObject)
+ return (BinaryObjectImpl)obj;
+
+ return toCacheObject0(obj, userObj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) {
+ if (type == BinaryObjectImpl.TYPE_BINARY)
+ return new BinaryObjectImpl(portableContext(), bytes, 0);
+
+ return super.toCacheObject(ctx, type, bytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObject toCacheObject(GridCacheContext ctx, long valPtr, boolean tmp)
+ throws IgniteCheckedException {
+ if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled())
+ return super.toCacheObject(ctx, valPtr, tmp);
+
+ Object val = unmarshal(valPtr, !tmp);
+
+ if (val instanceof BinaryObjectOffheapImpl)
+ return (BinaryObjectOffheapImpl)val;
+
+ return new CacheObjectImpl(val, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object unwrapTemporary(GridCacheContext ctx, Object obj) throws BinaryObjectException {
+ if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled())
+ return obj;
+
+ if (obj instanceof BinaryObjectOffheapImpl)
+ return ((BinaryObjectOffheapImpl)obj).heapCopy();
+
+ return obj;
+ }
+
+ /**
+ * @param obj Object.
+ * @return Portable object.
+ * @throws IgniteException In case of error.
+ */
+ @Nullable public Object toPortable(@Nullable Object obj) throws IgniteException {
+ if (obj == null)
+ return null;
+
+ if (isPortableObject(obj))
+ return obj;
+
+ return marshalToPortable(obj);
+ }
+
+ /**
+ * @param typeId Type ID.
+ * @param oldMeta Old meta.
+ * @param newMeta New meta.
+ * @param fields Fields map.
+ * @return Whether meta is changed.
+ * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+ */
+ private static boolean checkMeta(int typeId, @Nullable BinaryType oldMeta,
+ BinaryType newMeta, @Nullable Map<String, String> fields) throws BinaryObjectException {
+ assert newMeta != null;
+
+ Map<String, String> oldFields = oldMeta != null ? ((BinaryMetaDataImpl)oldMeta).fieldsMeta() : null;
+ Map<String, String> newFields = ((BinaryMetaDataImpl)newMeta).fieldsMeta();
+
+ boolean changed = false;
+
+ if (oldMeta != null) {
+ if (!oldMeta.typeName().equals(newMeta.typeName())) {
+ throw new BinaryObjectException(
+ "Two portable types have duplicate type ID [" +
+ "typeId=" + typeId +
+ ", typeName1=" + oldMeta.typeName() +
+ ", typeName2=" + newMeta.typeName() +
+ ']'
+ );
+ }
+
+ if (!F.eq(oldMeta.affinityKeyFieldName(), newMeta.affinityKeyFieldName())) {
+ throw new BinaryObjectException(
+ "Portable type has different affinity key fields on different clients [" +
+ "typeName=" + newMeta.typeName() +
+ ", affKeyFieldName1=" + oldMeta.affinityKeyFieldName() +
+ ", affKeyFieldName2=" + newMeta.affinityKeyFieldName() +
+ ']'
+ );
+ }
+
+ if (fields != null)
+ fields.putAll(oldFields);
+ }
+ else
+ changed = true;
+
+ for (Map.Entry<String, String> e : newFields.entrySet()) {
+ String typeName = oldFields != null ? oldFields.get(e.getKey()) : null;
+
+ if (typeName != null) {
+ if (!typeName.equals(e.getValue())) {
+ throw new BinaryObjectException(
+ "Portable field has different types on different clients [" +
+ "typeName=" + newMeta.typeName() +
+ ", fieldName=" + e.getKey() +
+ ", fieldTypeName1=" + typeName +
+ ", fieldTypeName2=" + e.getValue() +
+ ']'
+ );
+ }
+ }
+ else {
+ if (fields != null)
+ fields.put(e.getKey(), e.getValue());
+
+ changed = true;
+ }
+ }
+
+ return changed;
+ }
+
+ /**
+ */
+ private static class MetaDataProcessor implements
+ EntryProcessor<PortableMetaDataKey, BinaryType, BinaryObjectException>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int typeId;
+
+ /** */
+ private BinaryType newMeta;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public MetaDataProcessor() {
+ // No-op.
+ }
+
+ /**
+ * @param typeId Type ID.
+ * @param newMeta New metadata.
+ */
+ private MetaDataProcessor(int typeId, BinaryType newMeta) {
+ assert newMeta != null;
+
+ this.typeId = typeId;
+ this.newMeta = newMeta;
+ }
+
+ /** {@inheritDoc} */
+ @Override public BinaryObjectException process(
+ MutableEntry<PortableMetaDataKey, BinaryType> entry,
+ Object... args) {
+ try {
+ BinaryType oldMeta = entry.getValue();
+
+ Map<String, String> fields = new HashMap<>();
+
+ if (checkMeta(typeId, oldMeta, newMeta, fields)) {
+ BinaryType res = new BinaryMetaDataImpl(newMeta.typeName(),
+ fields,
+ newMeta.affinityKeyFieldName());
+
+ entry.setValue(res);
+
+ return null;
+ }
+ else
+ return null;
+ }
+ catch (BinaryObjectException e) {
+ return e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(typeId);
+ out.writeObject(newMeta);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ typeId = in.readInt();
+ newMeta = (BinaryType)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetaDataProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ class MetaDataEntryListener implements CacheEntryUpdatedListener<PortableMetaDataKey, BinaryType> {
+ /** {@inheritDoc} */
+ @Override public void onUpdated(
+ Iterable<CacheEntryEvent<? extends PortableMetaDataKey, ? extends BinaryType>> evts)
+ throws CacheEntryListenerException {
+ for (CacheEntryEvent<? extends PortableMetaDataKey, ? extends BinaryType> evt : evts) {
+ assert evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED : evt;
+
+ PortableMetaDataKey key = evt.getKey();
+
+ final BinaryType newMeta = evt.getValue();
+
+ assert newMeta != null : evt;
+
+ addClientCacheMetaData(key, newMeta);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetaDataEntryListener.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class MetaDataEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+ return evt.getKey() instanceof PortableMetaDataKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetaDataEntryFilter.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class MetaDataPredicate implements IgniteBiPredicate<Object, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Object key, Object val) {
+ return key instanceof PortableMetaDataKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetaDataPredicate.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
index 41a5a7a..c83c9af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
@@ -17,17 +17,9 @@
package org.apache.ignite.internal.processors.cache.portable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.portable.PortableUtils;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.portable.PortableObject;
/**
*
@@ -60,129 +52,4 @@ public class CacheObjectPortableContext extends CacheObjectContext {
public boolean portableEnabled() {
return portableEnabled;
}
-
- /** {@inheritDoc} */
- @Override public Object unwrapPortableIfNeeded(Object o, boolean keepPortable) {
- if (o == null)
- return null;
-
- if (keepPortable || !portableEnabled() || !PortableUtils.isPortableOrCollectionType(o.getClass()))
- return o;
-
- return unwrapPortable(o);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<Object> unwrapPortablesIfNeeded(Collection<Object> col, boolean keepPortable) {
- if (keepPortable || !portableEnabled())
- return col;
-
- if (col instanceof ArrayList)
- return unwrapPortables((ArrayList<Object>)col);
-
- if (col instanceof Set)
- return unwrapPortables((Set<Object>)col);
-
- Collection<Object> col0 = new ArrayList<>(col.size());
-
- for (Object obj : col)
- col0.add(unwrapPortable(obj));
-
- return col0;
- }
-
- /**
- * Unwraps map.
- *
- * @param map Map to unwrap.
- * @param keepPortable Keep portable flag.
- * @return Unwrapped collection.
- */
- public Map<Object, Object> unwrapPortablesIfNeeded(Map<Object, Object> map, boolean keepPortable) {
- if (keepPortable || !portableEnabled())
- return map;
-
- Map<Object, Object> map0 = PortableUtils.newMap(map);
-
- for (Map.Entry<Object, Object> e : map.entrySet())
- map0.put(unwrapPortable(e.getKey()), unwrapPortable(e.getValue()));
-
- return map0;
- }
-
- /**
- * Unwraps array list.
- *
- * @param col List to unwrap.
- * @return Unwrapped list.
- */
- private Collection<Object> unwrapPortables(ArrayList<Object> col) {
- int size = col.size();
-
- for (int i = 0; i < size; i++) {
- Object o = col.get(i);
-
- Object unwrapped = unwrapPortable(o);
-
- if (o != unwrapped)
- col.set(i, unwrapped);
- }
-
- return col;
- }
-
- /**
- * Unwraps set with portables.
- *
- * @param set Set to unwrap.
- * @return Unwrapped set.
- */
- private Set<Object> unwrapPortables(Set<Object> set) {
- Set<Object> set0 = PortableUtils.newSet(set);
-
- Iterator<Object> iter = set.iterator();
-
- while (iter.hasNext())
- set0.add(unwrapPortable(iter.next()));
-
- return set0;
- }
-
- /**
- * @param o Object to unwrap.
- * @return Unwrapped object.
- */
- private Object unwrapPortable(Object o) {
- if (o instanceof Map.Entry) {
- Map.Entry entry = (Map.Entry)o;
-
- Object key = entry.getKey();
-
- boolean unwrapped = false;
-
- if (key instanceof PortableObject) {
- key = ((PortableObject)key).deserialize();
-
- unwrapped = true;
- }
-
- Object val = entry.getValue();
-
- if (val instanceof PortableObject) {
- val = ((PortableObject)val).deserialize();
-
- unwrapped = true;
- }
-
- return unwrapped ? F.t(key, val) : o;
- }
- else if (o instanceof Collection)
- return unwrapPortablesIfNeeded((Collection<Object>)o, false);
- else if (o instanceof Map)
- return unwrapPortablesIfNeeded((Map<Object, Object>)o, false);
- else if (o instanceof PortableObject)
- return ((PortableObject)o).deserialize();
-
- return o;
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java
deleted file mode 100644
index fcd73d2..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.portable;
-
-import java.util.Collection;
-import java.util.Map;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgnitePortables;
-import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
-import org.apache.ignite.portable.PortableBuilder;
-import org.apache.ignite.portable.PortableMetadata;
-import org.apache.ignite.portable.PortableObject;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Extended cache object processor interface with additional methods for portables.
- */
-public interface CacheObjectPortableProcessor extends IgniteCacheObjectProcessor {
- /**
- * @param typeId Type ID.
- * @return Builder.
- */
- public PortableBuilder builder(int typeId);
-
- /**
- * @param clsName Class name.
- * @return Builder.
- */
- public PortableBuilder builder(String clsName);
-
- /**
- * Creates builder initialized by existing portable object.
- *
- * @param portableObj Portable object to edit.
- * @return Portable builder.
- */
- public PortableBuilder builder(PortableObject portableObj);
-
- /**
- * @param typeId Type ID.
- * @param newMeta New meta data.
- * @throws IgniteException In case of error.
- */
- public void addMeta(int typeId, final PortableMetadata newMeta) throws IgniteException;
-
- /**
- * @param typeId Type ID.
- * @param typeName Type name.
- * @param affKeyFieldName Affinity key field name.
- * @param fieldTypeIds Fields map.
- * @throws IgniteException In case of error.
- */
- public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName,
- Map<String, Integer> fieldTypeIds) throws IgniteException;
-
- /**
- * @param typeId Type ID.
- * @return Meta data.
- * @throws IgniteException In case of error.
- */
- @Nullable public PortableMetadata metadata(int typeId) throws IgniteException;
-
- /**
- * @param typeIds Type ID.
- * @return Meta data.
- * @throws IgniteException In case of error.
- */
- public Map<Integer, PortableMetadata> metadata(Collection<Integer> typeIds) throws IgniteException;
-
- /**
- * @return Metadata for all types.
- * @throws IgniteException In case of error.
- */
- public Collection<PortableMetadata> metadata() throws IgniteException;
-
- /**
- * @return Portables interface.
- * @throws IgniteException If failed.
- */
- public IgnitePortables portables() throws IgniteException;
-
- /**
- * @param obj Original object.
- * @return Portable object (in case portable marshaller is used).
- * @throws IgniteException If failed.
- */
- public Object marshalToPortable(Object obj) throws IgniteException;
-}
\ No newline at end of file