You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/21 08:48:20 UTC
[02/45] incubator-ignite git commit: ignite-1258: open sourced
portables implementation
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java
new file mode 100644
index 0000000..41d49d4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable offheap output stream.
+ */
+public class GridPortableOffheapOutputStream extends GridPortableAbstractOutputStream {
+ /** Pointer. */
+ private long ptr;
+
+ /** Length of bytes that cen be used before resize is necessary. */
+ private int cap;
+
+ /**
+ * Constructor.
+ *
+ * @param cap Capacity.
+ */
+ public GridPortableOffheapOutputStream(int cap) {
+ this(0, cap);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer to existing address.
+ * @param cap Capacity.
+ */
+ public GridPortableOffheapOutputStream(long ptr, int cap) {
+ this.ptr = ptr == 0 ? allocate(cap) : ptr;
+
+ this.cap = cap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ release(ptr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void ensureCapacity(int cnt) {
+ if (cnt > cap) {
+ int newCap = capacity(cap, cnt);
+
+ ptr = reallocate(ptr, newCap);
+
+ cap = newCap;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ return arrayCopy();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ byte[] res = new byte[pos];
+
+ UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos);
+
+ return res;
+ }
+
+ /**
+ * @return Pointer.
+ */
+ public long pointer() {
+ return ptr;
+ }
+
+ /**
+ * @return Capacity.
+ */
+ public int capacity() {
+ return cap;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeByteAndShift(byte val) {
+ UNSAFE.putByte(ptr + pos++, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void copyAndShift(Object src, long offset, int len) {
+ UNSAFE.copyMemory(src, offset, null, ptr + pos, len);
+
+ shift(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeShortFast(short val) {
+ UNSAFE.putShort(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeCharFast(char val) {
+ UNSAFE.putChar(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeIntFast(int val) {
+ UNSAFE.putInt(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeLongFast(long val) {
+ UNSAFE.putLong(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeIntPositioned(int pos, int val) {
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ UNSAFE.putInt(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ return false;
+ }
+
+ /**
+ * Allocate memory.
+ *
+ * @param cap Capacity.
+ * @return Pointer.
+ */
+ protected long allocate(int cap) {
+ return UNSAFE.allocateMemory(cap);
+ }
+
+ /**
+ * Reallocate memory.
+ *
+ * @param ptr Old pointer.
+ * @param cap Capacity.
+ * @return New pointer.
+ */
+ protected long reallocate(long ptr, int cap) {
+ return UNSAFE.reallocateMemory(ptr, cap);
+ }
+
+ /**
+ * Release memory.
+ *
+ * @param ptr Pointer.
+ */
+ protected void release(long ptr) {
+ UNSAFE.freeMemory(ptr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java
new file mode 100644
index 0000000..8c27bf6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import org.apache.ignite.internal.util.*;
+
+import sun.misc.*;
+
+/**
+ * Naive implementation of portable memory allocator.
+ */
+public class GridPortableSimpleMemoryAllocator implements GridPortableMemoryAllocator {
+ /** Unsafe. */
+ private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** Array offset: byte. */
+ protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** {@inheritDoc} */
+ @Override public byte[] allocate(int size) {
+ return new byte[size];
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] reallocate(byte[] data, int size) {
+ byte[] newData = new byte[size];
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
+
+ return newData;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release(byte[] data, int maxMsgSize) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public long allocateDirect(int size) {
+ return UNSAFE.allocateMemory(size);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long reallocateDirect(long addr, int size) {
+ return UNSAFE.reallocateMemory(addr, size);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releaseDirect(long addr) {
+ UNSAFE.freeMemory(addr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/package-info.java
new file mode 100644
index 0000000..1d39a70
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains portable APIs implementation for streams.
+ */
+package org.apache.ignite.internal.portable.streams;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java
new file mode 100644
index 0000000..f8be30c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.portable.*;
+
+/**
+ *
+ */
+public class CacheDefaultPortableAffinityKeyMapper extends GridCacheDefaultAffinityKeyMapper {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public Object affinityKey(Object key) {
+ IgniteKernal kernal = (IgniteKernal)ignite;
+
+ CacheObjectPortableProcessorImpl proc = (CacheObjectPortableProcessorImpl)kernal.context().cacheObjects();
+
+ try {
+ key = proc.toPortable(key);
+ }
+ catch (IgniteException e) {
+ U.error(log, "Failed to marshal key to portable: " + key, e);
+ }
+
+ if (key instanceof PortableObject)
+ return proc.affinityKey((PortableObject)key);
+ else
+ return super.affinityKey(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
new file mode 100644
index 0000000..10deaf2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.portable.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class CacheObjectPortableContext extends CacheObjectContext {
+ /** */
+ private boolean portableEnabled;
+
+ /**
+ * @param kernalCtx Kernal context.
+ * @param portableEnabled Portable enabled flag.
+ * @param cpyOnGet Copy on get flag.
+ * @param storeVal {@code True} if should store unmarshalled value in cache.
+ */
+ public CacheObjectPortableContext(GridKernalContext kernalCtx,
+ boolean cpyOnGet,
+ boolean storeVal,
+ boolean portableEnabled) {
+ super(kernalCtx, portableEnabled ? new CacheDefaultPortableAffinityKeyMapper() :
+ new GridCacheDefaultAffinityKeyMapper(), cpyOnGet, storeVal);
+
+ this.portableEnabled = portableEnabled;
+ }
+
+ /**
+ * @return Portable enabled flag.
+ */
+ public boolean portableEnabled() {
+ return portableEnabled;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object unwrapPortableIfNeeded(Object o, boolean keepPortable) {
+ if (o == null)
+ return null;
+
+ if (keepPortable || !portableEnabled() || !GridPortableUtils.isPortableOrCollectionType(o.getClass()))
+ return o;
+
+ return unwrapPortable(o);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Object> unwrapPortablesIfNeeded(Collection<Object> col, boolean keepPortable) {
+ if (keepPortable || !portableEnabled())
+ return col;
+
+ if (col instanceof ArrayList)
+ return unwrapPortables((ArrayList<Object>)col);
+
+ Collection<Object> col0 = new ArrayList<>(col.size());
+
+ for (Object obj : col)
+ col0.add(unwrapPortable(obj));
+
+ return col0;
+ }
+
+ /**
+ * Unwraps map.
+ *
+ * @param map Map to unwrap.
+ * @param keepPortable Keep portable flag.
+ * @return Unwrapped collection.
+ */
+ public Map<Object, Object> unwrapPortablesIfNeeded(Map<Object, Object> map, boolean keepPortable) {
+ if (keepPortable || !portableEnabled())
+ return map;
+
+ Map<Object, Object> map0 = U.newHashMap(map.size());
+
+ for (Map.Entry<Object, Object> e : map.entrySet())
+ map0.put(unwrapPortable(e.getKey()), unwrapPortable(e.getValue()));
+
+ return map0;
+ }
+
+ /**
+ * Unwraps array list.
+ *
+ * @param col List to unwrap.
+ * @return Unwrapped list.
+ */
+ private Collection<Object> unwrapPortables(ArrayList<Object> col) {
+ int size = col.size();
+
+ for (int i = 0; i < size; i++) {
+ Object o = col.get(i);
+
+ Object unwrapped = unwrapPortable(o);
+
+ if (o != unwrapped)
+ col.set(i, unwrapped);
+ }
+
+ return col;
+ }
+
+ /**
+ * @param o Object to unwrap.
+ * @return Unwrapped object.
+ */
+ private Object unwrapPortable(Object o) {
+ if (o instanceof Map.Entry) {
+ Map.Entry entry = (Map.Entry)o;
+
+ Object key = entry.getKey();
+
+ boolean unwrapped = false;
+
+ if (key instanceof PortableObject) {
+ key = ((PortableObject)key).deserialize();
+
+ unwrapped = true;
+ }
+
+ Object val = entry.getValue();
+
+ if (val instanceof PortableObject) {
+ val = ((PortableObject)val).deserialize();
+
+ unwrapped = true;
+ }
+
+ return unwrapped ? F.t(key, val) : o;
+ }
+ else if (o instanceof Collection)
+ return unwrapPortablesIfNeeded((Collection<Object>)o, false);
+ else if (o instanceof Map)
+ return unwrapPortablesIfNeeded((Map<Object, Object>)o, false);
+ else if (o instanceof PortableObject)
+ return ((PortableObject)o).deserialize();
+
+ return o;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java
new file mode 100644
index 0000000..fb047d1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cacheobject.*;
+import org.apache.ignite.portable.*;
+
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Extended cache object processor interface with additional methods for portables.
+ */
+public interface CacheObjectPortableProcessor extends IgniteCacheObjectProcessor {
+ /**
+ * @param typeId Type ID.
+ * @return Builder.
+ */
+ public PortableBuilder builder(int typeId);
+
+ /**
+ * @param clsName Class name.
+ * @return Builder.
+ */
+ public PortableBuilder builder(String clsName);
+
+ /**
+ * Creates builder initialized by existing portable object.
+ *
+ * @param portableObj Portable object to edit.
+ * @return Portable builder.
+ */
+ public PortableBuilder builder(PortableObject portableObj);
+
+ /**
+ * @param typeId Type ID.
+ * @param newMeta New meta data.
+ * @throws IgniteException In case of error.
+ */
+ public void addMeta(int typeId, final PortableMetadata newMeta) throws IgniteException;
+
+ /**
+ * @param typeId Type ID.
+ * @param typeName Type name.
+ * @param affKeyFieldName Affinity key field name.
+ * @param fieldTypeIds Fields map.
+ * @throws IgniteException In case of error.
+ */
+ public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName,
+ Map<String, Integer> fieldTypeIds) throws IgniteException;
+
+ /**
+ * @param typeId Type ID.
+ * @return Meta data.
+ * @throws IgniteException In case of error.
+ */
+ @Nullable public PortableMetadata metadata(int typeId) throws IgniteException;
+
+ /**
+ * @param typeIds Type ID.
+ * @return Meta data.
+ * @throws IgniteException In case of error.
+ */
+ public Map<Integer, PortableMetadata> metadata(Collection<Integer> typeIds) throws IgniteException;
+
+ /**
+ * @return Metadata for all types.
+ * @throws IgniteException In case of error.
+ */
+ public Collection<PortableMetadata> metadata() throws IgniteException;
+
+ /**
+ * @return Portables interface.
+ * @throws IgniteException If failed.
+ */
+ public IgnitePortables portables() throws IgniteException;
+
+ /**
+ * @param obj Original object.
+ * @return Portable object (in case portable marshaller is used).
+ * @throws IgniteException If failed.
+ */
+ public Object marshalToPortable(Object obj) throws IgniteException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
new file mode 100644
index 0000000..15b2f00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
@@ -0,0 +1,949 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.portable.streams.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.cacheobject.*;
+import org.apache.ignite.internal.processors.portable.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.portable.*;
+import org.apache.ignite.portable.*;
+
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+import sun.misc.*;
+
+import javax.cache.Cache;
+import javax.cache.*;
+import javax.cache.event.*;
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.*;
+
+/**
+ * Portable processor implementation.
+ */
+public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessorImpl implements
+ CacheObjectPortableProcessor {
+ /** */
+ public static final String[] FIELD_TYPE_NAMES;
+
+ /** */
+ private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** */
+ private final CountDownLatch startLatch = new CountDownLatch(1);
+
+ /** */
+ private final boolean clientNode;
+
+ /** */
+ private volatile IgniteCacheProxy<PortableMetaDataKey, PortableMetadata> metaDataCache;
+
+ /** */
+ private final ConcurrentHashMap8<PortableMetaDataKey, PortableMetadata> clientMetaDataCache;
+
+ /** Predicate to filter portable meta data in utility cache. */
+ private final CacheEntryPredicate metaPred = new CacheEntryPredicateAdapter() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public boolean apply(GridCacheEntryEx e) {
+ return e.key().value(e.context().cacheObjectContext(), false) instanceof PortableMetaDataKey;
+ }
+ };
+
+ /** */
+ private GridPortableContext portableCtx;
+
+ /** */
+ private Marshaller marsh;
+
+ /** */
+ private GridPortableMarshaller portableMarsh;
+
+ /** */
+ @GridToStringExclude
+ private IgnitePortables portables;
+
+ /** Metadata updates collected before metadata cache is initialized. */
+ private final Map<Integer, GridPortableMetaDataImpl> metaBuf = new ConcurrentHashMap<>();
+
+ /** */
+ private UUID metaCacheQryId;
+
+ /**
+ *
+ */
+ static {
+ FIELD_TYPE_NAMES = new String[104];
+
+ FIELD_TYPE_NAMES[BYTE] = "byte";
+ FIELD_TYPE_NAMES[SHORT] = "short";
+ FIELD_TYPE_NAMES[INT] = "int";
+ FIELD_TYPE_NAMES[LONG] = "long";
+ FIELD_TYPE_NAMES[BOOLEAN] = "boolean";
+ FIELD_TYPE_NAMES[FLOAT] = "float";
+ FIELD_TYPE_NAMES[DOUBLE] = "double";
+ FIELD_TYPE_NAMES[CHAR] = "char";
+ FIELD_TYPE_NAMES[UUID] = "UUID";
+ FIELD_TYPE_NAMES[DECIMAL] = "decimal";
+ FIELD_TYPE_NAMES[STRING] = "String";
+ FIELD_TYPE_NAMES[DATE] = "Date";
+ FIELD_TYPE_NAMES[ENUM] = "Enum";
+ FIELD_TYPE_NAMES[OBJ] = "Object";
+ FIELD_TYPE_NAMES[PORTABLE_OBJ] = "Object";
+ FIELD_TYPE_NAMES[COL] = "Collection";
+ FIELD_TYPE_NAMES[MAP] = "Map";
+ FIELD_TYPE_NAMES[BYTE_ARR] = "byte[]";
+ FIELD_TYPE_NAMES[SHORT_ARR] = "short[]";
+ FIELD_TYPE_NAMES[INT_ARR] = "int[]";
+ FIELD_TYPE_NAMES[LONG_ARR] = "long[]";
+ FIELD_TYPE_NAMES[BOOLEAN_ARR] = "boolean[]";
+ FIELD_TYPE_NAMES[FLOAT_ARR] = "float[]";
+ FIELD_TYPE_NAMES[DOUBLE_ARR] = "double[]";
+ FIELD_TYPE_NAMES[CHAR_ARR] = "char[]";
+ FIELD_TYPE_NAMES[UUID_ARR] = "UUID[]";
+ FIELD_TYPE_NAMES[DECIMAL_ARR] = "decimal[]";
+ FIELD_TYPE_NAMES[STRING_ARR] = "String[]";
+ FIELD_TYPE_NAMES[DATE_ARR] = "Date[]";
+ FIELD_TYPE_NAMES[OBJ_ARR] = "Object[]";
+ FIELD_TYPE_NAMES[ENUM_ARR] = "Enum[]";
+ }
+
+ /**
+ * @param typeName Field type name.
+ * @return Field type ID;
+ */
+ @SuppressWarnings("StringEquality")
+ public static int fieldTypeId(String typeName) {
+ for (int i = 0; i < FIELD_TYPE_NAMES.length ; i++) {
+ String typeName0 = FIELD_TYPE_NAMES[i];
+
+ if (typeName.equals(typeName0))
+ return i;
+ }
+
+ throw new IllegalArgumentException("Invalid metadata type name: " + typeName);
+ }
+
+ /**
+ * @param typeId Field type ID.
+ * @return Field type name.
+ */
+ public static String fieldTypeName(int typeId) {
+ assert typeId >= 0 && typeId < FIELD_TYPE_NAMES.length : typeId;
+
+ String typeName = FIELD_TYPE_NAMES[typeId];
+
+ assert typeName != null : typeId;
+
+ return typeName;
+ }
+
+ /**
+ * @param typeIds Field type IDs.
+ * @return Field type names.
+ */
+ public static Map<String, String> fieldTypeNames(Map<String, Integer> typeIds) {
+ Map<String, String> names = U.newHashMap(typeIds.size());
+
+ for (Map.Entry<String, Integer> e : typeIds.entrySet())
+ names.put(e.getKey(), fieldTypeName(e.getValue()));
+
+ return names;
+ }
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public CacheObjectPortableProcessorImpl(GridKernalContext ctx) {
+ super(ctx);
+
+ marsh = ctx.grid().configuration().getMarshaller();
+
+ clientNode = this.ctx.clientNode();
+
+ clientMetaDataCache = clientNode ? new ConcurrentHashMap8<PortableMetaDataKey, PortableMetadata>() : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (marsh instanceof PortableMarshaller) {
+ GridPortableMetaDataHandler metaHnd = new GridPortableMetaDataHandler() {
+ @Override public void addMeta(int typeId, GridPortableMetaDataImpl newMeta)
+ throws PortableException {
+ if (metaDataCache == null) {
+ GridPortableMetaDataImpl oldMeta = metaBuf.get(typeId);
+
+ if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) {
+ synchronized (this) {
+ Map<String, String> fields = new HashMap<>();
+
+ if (checkMeta(typeId, oldMeta, newMeta, fields)) {
+ newMeta = new GridPortableMetaDataImpl(newMeta.typeName(),
+ fields,
+ newMeta.affinityKeyFieldName());
+
+ metaBuf.put(typeId, newMeta);
+ } else
+ return;
+ }
+
+ if (metaDataCache == null)
+ return;
+ else
+ metaBuf.remove(typeId);
+ } else
+ return;
+ }
+
+ CacheObjectPortableProcessorImpl.this.addMeta(typeId, newMeta);
+ }
+
+ @Override public PortableMetadata metadata(int typeId) throws PortableException {
+ if (metaDataCache == null)
+ U.awaitQuiet(startLatch);
+
+ return CacheObjectPortableProcessorImpl.this.metadata(typeId);
+ }
+ };
+
+ PortableMarshaller pMarh0 = (PortableMarshaller)marsh;
+
+ portableCtx = new GridPortableContext(metaHnd, ctx.gridName());
+
+ IgniteUtils.invoke(PortableMarshaller.class, pMarh0, "setPortableContext", portableCtx);
+
+ portableMarsh = new GridPortableMarshaller(portableCtx);
+
+ portables = new IgnitePortablesImpl(ctx, this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
+ metaDataCache = ctx.cache().jcache(CU.UTILITY_CACHE_NAME);
+
+ if (clientNode) {
+ assert !metaDataCache.context().affinityNode();
+
+ metaCacheQryId = metaDataCache.context().continuousQueries().executeInternalQuery(
+ new MetaDataEntryListener(),
+ new MetaDataEntryFilter(),
+ false,
+ true);
+
+ while (true) {
+ ClusterNode oldestSrvNode =
+ CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
+
+ if (oldestSrvNode == null)
+ break;
+
+ GridCacheQueryManager qryMgr = metaDataCache.context().queries();
+
+ CacheQuery<Map.Entry<PortableMetaDataKey, PortableMetadata>> qry =
+ qryMgr.createScanQuery(new MetaDataPredicate(), null, false);
+
+ qry.keepAll(false);
+
+ qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
+
+ try {
+ CacheQueryFuture<Map.Entry<PortableMetaDataKey, PortableMetadata>> fut = qry.execute();
+
+ Map.Entry<PortableMetaDataKey, PortableMetadata> next;
+
+ while ((next = fut.next()) != null) {
+ assert next.getKey() != null : next;
+ assert next.getValue() != null : next;
+
+ addClientCacheMetaData(next.getKey(), next.getValue());
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (!ctx.discovery().alive(oldestSrvNode) || !ctx.discovery().pingNode(oldestSrvNode.id()))
+ continue;
+ else
+ throw e;
+ }
+
+ break;
+ }
+ }
+
+ startLatch.countDown();
+
+ for (Map.Entry<Integer, GridPortableMetaDataImpl> e : metaBuf.entrySet())
+ addMeta(e.getKey(), e.getValue());
+
+ metaBuf.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ super.onKernalStop(cancel);
+
+ if (metaCacheQryId != null)
+ metaDataCache.context().continuousQueries().cancelInternalQuery(metaCacheQryId);
+ }
+
+ /**
+ * @param key Metadata key.
+ * @param newMeta Metadata.
+ */
+ private void addClientCacheMetaData(PortableMetaDataKey key, final PortableMetadata newMeta) {
+ clientMetaDataCache.compute(key,
+ new ConcurrentHashMap8.BiFun<PortableMetaDataKey, PortableMetadata, PortableMetadata>() {
+ @Override public PortableMetadata apply(PortableMetaDataKey key, PortableMetadata oldMeta) {
+ PortableMetadata res;
+
+ try {
+ res = checkMeta(key.typeId(), oldMeta, newMeta, null) ? newMeta : oldMeta;
+ }
+ catch (PortableException e) {
+ res = oldMeta;
+ }
+
+ return res;
+ }
+ }
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public int typeId(String typeName) {
+ return portableCtx.typeId(typeName);
+ }
+
+ /**
+ * @param obj Object.
+ * @return Bytes.
+ * @throws PortableException If failed.
+ */
+ public byte[] marshal(@Nullable Object obj) throws PortableException {
+ byte[] arr = portableMarsh.marshal(obj, 0);
+
+ assert arr.length > 0;
+
+ return arr;
+ }
+
+ /**
+ * @param ptr Off-heap pointer.
+ * @param forceHeap If {@code true} creates heap-based object.
+ * @return Object.
+ * @throws PortableException If failed.
+ */
+ public Object unmarshal(long ptr, boolean forceHeap) throws PortableException {
+ assert ptr > 0 : ptr;
+
+ int size = UNSAFE.getInt(ptr);
+
+ ptr += 4;
+
+ byte type = UNSAFE.getByte(ptr++);
+
+ if (type != CacheObject.TYPE_BYTE_ARR) {
+ assert size > 0 : size;
+
+ GridPortableInputStream in = new GridPortableOffheapInputStream(ptr, size, forceHeap);
+
+ return portableMarsh.unmarshal(in);
+ }
+ else
+ return U.copyMemory(ptr, size);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object marshalToPortable(@Nullable Object obj) throws PortableException {
+ if (obj == null)
+ return null;
+
+ if (GridPortableUtils.isPortableType(obj.getClass()))
+ return obj;
+
+ if (obj instanceof Object[]) {
+ Object[] arr = (Object[])obj;
+
+ Object[] pArr = new Object[arr.length];
+
+ for (int i = 0; i < arr.length; i++)
+ pArr[i] = marshalToPortable(arr[i]);
+
+ return pArr;
+ }
+
+ if (obj instanceof Collection) {
+ Collection<?> col = (Collection<?>)obj;
+
+ Collection<Object> pCol = new ArrayList<>(col.size());
+
+ for (Object item : col)
+ pCol.add(marshalToPortable(item));
+
+ return pCol;
+ }
+
+ if (obj instanceof Map) {
+ Map<?, ?> map = (Map<?, ?>)obj;
+
+ Map<Object, Object> pMap = new HashMap<>(map.size(), 1.0f);
+
+ for (Map.Entry<?, ?> e : map.entrySet())
+ pMap.put(marshalToPortable(e.getKey()), marshalToPortable(e.getValue()));
+
+ return pMap;
+ }
+
+ if (obj instanceof Map.Entry) {
+ Map.Entry<?, ?> e = (Map.Entry<?, ?>)obj;
+
+ return new GridMapEntry<>(marshalToPortable(e.getKey()), marshalToPortable(e.getValue()));
+ }
+
+ byte[] arr = portableMarsh.marshal(obj, 0);
+
+ assert arr.length > 0;
+
+ Object obj0 = portableMarsh.unmarshal(arr, null);
+
+ assert obj0 instanceof PortableObject;
+
+ ((GridPortableObjectImpl)obj0).detachAllowed(true);
+
+ return obj0;
+ }
+
+ /**
+ * @return Marshaller.
+ */
+ public GridPortableMarshaller marshaller() {
+ return portableMarsh;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PortableBuilder builder(int typeId) {
+ return new GridPortableBuilderImpl(portableCtx, typeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PortableBuilder builder(String clsName) {
+ return new GridPortableBuilderImpl(portableCtx, clsName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PortableBuilder builder(PortableObject portableObj) {
+ return GridPortableBuilderImpl.wrap(portableObj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName,
+ Map<String, Integer> fieldTypeIds) throws PortableException {
+ portableCtx.updateMetaData(typeId,
+ new GridPortableMetaDataImpl(typeName, fieldTypeNames(fieldTypeIds), affKeyFieldName));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addMeta(final int typeId, final PortableMetadata newMeta) throws PortableException {
+ assert newMeta != null;
+
+ final PortableMetaDataKey key = new PortableMetaDataKey(typeId);
+
+ try {
+ PortableMetadata oldMeta = metaDataCache.localPeek(key);
+
+ if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) {
+ PortableException err = metaDataCache.invoke(key, new MetaDataProcessor(typeId, newMeta));
+
+ if (err != null)
+ throw err;
+ }
+ }
+ catch (CacheException e) {
+ throw new PortableException("Failed to update meta data for type: " + newMeta.typeName(), e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public PortableMetadata metadata(final int typeId) throws PortableException {
+ try {
+ if (clientNode)
+ return clientMetaDataCache.get(new PortableMetaDataKey(typeId));
+
+ return metaDataCache.localPeek(new PortableMetaDataKey(typeId));
+ }
+ catch (CacheException e) {
+ throw new PortableException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Integer, PortableMetadata> metadata(Collection<Integer> typeIds)
+ throws PortableException {
+ try {
+ Collection<PortableMetaDataKey> keys = new ArrayList<>(typeIds.size());
+
+ for (Integer typeId : typeIds)
+ keys.add(new PortableMetaDataKey(typeId));
+
+ Map<PortableMetaDataKey, PortableMetadata> meta = metaDataCache.getAll(keys);
+
+ Map<Integer, PortableMetadata> res = U.newHashMap(meta.size());
+
+ for (Map.Entry<PortableMetaDataKey, PortableMetadata> e : meta.entrySet())
+ res.put(e.getKey().typeId(), e.getValue());
+
+ return res;
+ }
+ catch (CacheException e) {
+ throw new PortableException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Collection<PortableMetadata> metadata() throws PortableException {
+ if (clientNode)
+ return new ArrayList<>(clientMetaDataCache.values());
+
+ return F.viewReadOnly(metaDataCache.entrySetx(metaPred),
+ new C1<Cache.Entry<PortableMetaDataKey, PortableMetadata>, PortableMetadata>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public PortableMetadata apply(
+ Cache.Entry<PortableMetaDataKey, PortableMetadata> e) {
+ return e.getValue();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgnitePortables portables() throws IgniteException {
+ return portables;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isPortableObject(Object obj) {
+ return obj instanceof PortableObject;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isPortableEnabled() {
+ return marsh instanceof PortableMarshaller;
+ }
+
+ /**
+ * @param po Portable object.
+ * @return Affinity key.
+ */
+ public Object affinityKey(PortableObject po) {
+ try {
+ PortableMetadata meta = po.metaData();
+
+ if (meta != null) {
+ String affKeyFieldName = meta.affinityKeyFieldName();
+
+ if (affKeyFieldName != null)
+ return po.field(affKeyFieldName);
+ }
+ }
+ catch (PortableException e) {
+ U.error(log, "Failed to get affinity field from portable object: " + po, e);
+ }
+
+ return po;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int typeId(Object obj) {
+ if (obj == null)
+ return 0;
+
+ return isPortableObject(obj) ? ((PortableObject)obj).typeId() : typeId(obj.getClass().getSimpleName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object field(Object obj, String fieldName) {
+ if (obj == null)
+ return null;
+
+ return isPortableObject(obj) ? ((PortableObject)obj).field(fieldName) : super.field(obj, fieldName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasField(Object obj, String fieldName) {
+ return obj != null && ((PortableObject)obj).hasField(fieldName);
+ }
+
+ /**
+ * @return Portable context.
+ */
+ public GridPortableContext portableContext() {
+ return portableCtx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObjectContext contextForCache(CacheConfiguration cfg) throws IgniteCheckedException {
+ assert cfg != null;
+
+ boolean portableEnabled = marsh instanceof PortableMarshaller && !GridCacheUtils.isSystemCache(cfg.getName()) &&
+ !GridCacheUtils.isIgfsCache(ctx.config(), cfg.getName());
+
+ CacheObjectContext ctx0 = super.contextForCache(cfg);
+
+ CacheObjectContext res = new CacheObjectPortableContext(ctx,
+ ctx0.copyOnGet(),
+ ctx0.storeValue(),
+ portableEnabled);
+
+ ctx.resource().injectGeneric(res.defaultAffMapper());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException {
+ if (!((CacheObjectPortableContext)ctx).portableEnabled() || portableMarsh == null)
+ return super.marshal(ctx, val);
+
+ byte[] arr = portableMarsh.marshal(val, 0);
+
+ assert arr.length > 0;
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr)
+ throws IgniteCheckedException
+ {
+ if (!((CacheObjectPortableContext)ctx).portableEnabled() || portableMarsh == null)
+ return super.unmarshal(ctx, bytes, clsLdr);
+
+ return portableMarsh.unmarshal(bytes, clsLdr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) {
+ if (!((CacheObjectPortableContext)ctx).portableEnabled())
+ return super.toCacheKeyObject(ctx, obj, userObj);
+
+ if (obj instanceof KeyCacheObject)
+ return (KeyCacheObject)obj;
+
+ if (((CacheObjectPortableContext)ctx).portableEnabled()) {
+ obj = toPortable(obj);
+
+ if (obj instanceof PortableObject)
+ return (GridPortableObjectImpl)obj;
+ }
+
+ return toCacheKeyObject0(obj, userObj);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, boolean userObj) {
+ if (!((CacheObjectPortableContext)ctx).portableEnabled())
+ return super.toCacheObject(ctx, obj, userObj);
+
+ if (obj == null || obj instanceof CacheObject)
+ return (CacheObject)obj;
+
+ obj = toPortable(obj);
+
+ if (obj instanceof PortableObject)
+ return (GridPortableObjectImpl)obj;
+
+ return toCacheObject0(obj, userObj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) {
+ if (type == GridPortableObjectImpl.TYPE_PORTABLE)
+ return new GridPortableObjectImpl(portableContext(), bytes, 0);
+
+ return super.toCacheObject(ctx, type, bytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObject toCacheObject(GridCacheContext ctx, long valPtr, boolean tmp)
+ throws IgniteCheckedException {
+ if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled())
+ return super.toCacheObject(ctx, valPtr, tmp);
+
+ Object val = unmarshal(valPtr, !tmp);
+
+ if (val instanceof GridPortableObjectOffheapImpl)
+ return (GridPortableObjectOffheapImpl)val;
+
+ return new CacheObjectImpl(val, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object unwrapTemporary(GridCacheContext ctx, Object obj) throws PortableException {
+ if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled())
+ return obj;
+
+ if (obj instanceof GridPortableObjectOffheapImpl)
+ return ((GridPortableObjectOffheapImpl)obj).heapCopy();
+
+ return obj;
+ }
+
+ /**
+ * @param obj Object.
+ * @return Portable object.
+ * @throws IgniteException In case of error.
+ */
+ @Nullable public Object toPortable(@Nullable Object obj) throws IgniteException {
+ if (obj == null)
+ return null;
+
+ if (isPortableObject(obj))
+ return obj;
+
+ return marshalToPortable(obj);
+ }
+
+ /**
+ * @param typeId Type ID.
+ * @param oldMeta Old meta.
+ * @param newMeta New meta.
+ * @param fields Fields map.
+ * @return Whether meta is changed.
+ * @throws PortableException In case of error.
+ */
+ private static boolean checkMeta(int typeId, @Nullable PortableMetadata oldMeta,
+ PortableMetadata newMeta, @Nullable Map<String, String> fields) throws PortableException {
+ assert newMeta != null;
+
+ Map<String, String> oldFields = oldMeta != null ? ((GridPortableMetaDataImpl)oldMeta).fieldsMeta() : null;
+ Map<String, String> newFields = ((GridPortableMetaDataImpl)newMeta).fieldsMeta();
+
+ boolean changed = false;
+
+ if (oldMeta != null) {
+ if (!oldMeta.typeName().equals(newMeta.typeName())) {
+ throw new PortableException(
+ "Two portable types have duplicate type ID [" +
+ "typeId=" + typeId +
+ ", typeName1=" + oldMeta.typeName() +
+ ", typeName2=" + newMeta.typeName() +
+ ']'
+ );
+ }
+
+ if (!F.eq(oldMeta.affinityKeyFieldName(), newMeta.affinityKeyFieldName())) {
+ throw new PortableException(
+ "Portable type has different affinity key fields on different clients [" +
+ "typeName=" + newMeta.typeName() +
+ ", affKeyFieldName1=" + oldMeta.affinityKeyFieldName() +
+ ", affKeyFieldName2=" + newMeta.affinityKeyFieldName() +
+ ']'
+ );
+ }
+
+ if (fields != null)
+ fields.putAll(oldFields);
+ }
+ else
+ changed = true;
+
+ for (Map.Entry<String, String> e : newFields.entrySet()) {
+ String typeName = oldFields != null ? oldFields.get(e.getKey()) : null;
+
+ if (typeName != null) {
+ if (!typeName.equals(e.getValue())) {
+ throw new PortableException(
+ "Portable field has different types on different clients [" +
+ "typeName=" + newMeta.typeName() +
+ ", fieldName=" + e.getKey() +
+ ", fieldTypeName1=" + typeName +
+ ", fieldTypeName2=" + e.getValue() +
+ ']'
+ );
+ }
+ }
+ else {
+ if (fields != null)
+ fields.put(e.getKey(), e.getValue());
+
+ changed = true;
+ }
+ }
+
+ return changed;
+ }
+
+ /**
+ */
+ private static class MetaDataProcessor implements
+ EntryProcessor<PortableMetaDataKey, PortableMetadata, PortableException>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int typeId;
+
+ /** */
+ private PortableMetadata newMeta;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public MetaDataProcessor() {
+ // No-op.
+ }
+
+ /**
+ * @param typeId Type ID.
+ * @param newMeta New metadata.
+ */
+ private MetaDataProcessor(int typeId, PortableMetadata newMeta) {
+ assert newMeta != null;
+
+ this.typeId = typeId;
+ this.newMeta = newMeta;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PortableException process(
+ MutableEntry<PortableMetaDataKey, PortableMetadata> entry,
+ Object... args) {
+ try {
+ PortableMetadata oldMeta = entry.getValue();
+
+ Map<String, String> fields = new HashMap<>();
+
+ if (checkMeta(typeId, oldMeta, newMeta, fields)) {
+ PortableMetadata res = new GridPortableMetaDataImpl(newMeta.typeName(),
+ fields,
+ newMeta.affinityKeyFieldName());
+
+ entry.setValue(res);
+
+ return null;
+ }
+ else
+ return null;
+ }
+ catch (PortableException e) {
+ return e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(typeId);
+ out.writeObject(newMeta);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ typeId = in.readInt();
+ newMeta = (PortableMetadata)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetaDataProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ class MetaDataEntryListener implements CacheEntryUpdatedListener<PortableMetaDataKey, PortableMetadata> {
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends PortableMetaDataKey, ? extends PortableMetadata>> evts)
+ throws CacheEntryListenerException {
+ for (CacheEntryEvent<? extends PortableMetaDataKey, ? extends PortableMetadata> evt : evts) {
+ assert evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED : evt;
+
+ PortableMetaDataKey key = evt.getKey();
+
+ final PortableMetadata newMeta = evt.getValue();
+
+ assert newMeta != null : evt;
+
+ addClientCacheMetaData(key, newMeta);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetaDataEntryListener.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class MetaDataEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+ return evt.getKey() instanceof PortableMetaDataKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetaDataEntryFilter.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class MetaDataPredicate implements IgniteBiPredicate<Object, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Object key, Object val) {
+ return key instanceof PortableMetaDataKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetaDataPredicate.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgnitePortablesImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgnitePortablesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgnitePortablesImpl.java
new file mode 100644
index 0000000..d819a56
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgnitePortablesImpl.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cacheobject.*;
+import org.apache.ignite.portable.*;
+
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * {@link IgnitePortables} implementation.
+ */
+public class IgnitePortablesImpl implements IgnitePortables {
+ /** */
+ private GridKernalContext ctx;
+
+ /** */
+ private CacheObjectPortableProcessor proc;
+
+ /**
+ * @param ctx Context.
+ */
+ public IgnitePortablesImpl(GridKernalContext ctx, CacheObjectPortableProcessor proc) {
+ this.ctx = ctx;
+
+ this.proc = proc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int typeId(String typeName) {
+ guard();
+
+ try {
+ return proc.typeId(typeName);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T toPortable(@Nullable Object obj) throws PortableException {
+ guard();
+
+ try {
+ return (T)proc.marshalToPortable(obj);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public PortableBuilder builder(int typeId) {
+ guard();
+
+ try {
+ return proc.builder(typeId);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public PortableBuilder builder(String typeName) {
+ guard();
+
+ try {
+ return proc.builder(typeName);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public PortableBuilder builder(PortableObject portableObj) {
+ guard();
+
+ try {
+ return proc.builder(portableObj);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public PortableMetadata metadata(Class<?> cls) throws PortableException {
+ guard();
+
+ try {
+ return proc.metadata(proc.typeId(cls.getName()));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public PortableMetadata metadata(String typeName) throws PortableException {
+ guard();
+
+ try {
+ return proc.metadata(proc.typeId(typeName));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public PortableMetadata metadata(int typeId) throws PortableException {
+ guard();
+
+ try {
+ return proc.metadata(typeId);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<PortableMetadata> metadata() throws PortableException {
+ guard();
+
+ try {
+ return proc.metadata();
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /**
+ * @return Portable processor.
+ */
+ public IgniteCacheObjectProcessor processor() {
+ return proc;
+ }
+
+ /**
+ * <tt>ctx.gateway().readLock()</tt>
+ */
+ private void guard() {
+ ctx.gateway().readLock();
+ }
+
+ /**
+ * <tt>ctx.gateway().readUnlock()</tt>
+ */
+ private void unguard() {
+ ctx.gateway().readUnlock();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/PortableMetaDataKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/PortableMetaDataKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/PortableMetaDataKey.java
new file mode 100644
index 0000000..d2d53cd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/PortableMetaDataKey.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Key for portable meta data.
+ */
+class PortableMetaDataKey extends GridCacheUtilityKey<PortableMetaDataKey> implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int typeId;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public PortableMetaDataKey() {
+ // No-op.
+ }
+
+ /**
+ * @param typeId Type ID.
+ */
+ PortableMetaDataKey(int typeId) {
+ this.typeId = typeId;
+ }
+
+ /**
+ * @return Type id.
+ */
+ public int typeId() {
+ return typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(typeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ typeId = in.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean equalsx(PortableMetaDataKey key) {
+ return typeId == key.typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PortableMetaDataKey.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/package-info.java
new file mode 100644
index 0000000..6c30811
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Implementation of portable processor.
+ */
+package org.apache.ignite.internal.processors.cache.portable;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
new file mode 100644
index 0000000..08f9336
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.marshaller.portable;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.portable.*;
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.sql.*;
+import java.util.*;
+
+/**
+ * Implementation of {@link org.apache.ignite.marshaller.Marshaller} that lets to serialize and deserialize all objects
+ * in the portable format.
+ * <p>
+ * {@code PortableMarshaller} is tested only on Java HotSpot VM on other VMs
+ * it could yield unexpected results.
+ * <p>
+ * <h1 class="header">Configuration</h1>
+ * <h2 class="header">Mandatory</h2>
+ * This marshaller has no mandatory configuration parameters.
+ * <h2 class="header">Java Example</h2>
+ * <pre name="code" class="java">
+ * PortableMarshaller marshaller = new PortableMarshaller();
+ *
+ * IgniteConfiguration cfg = new IgniteConfiguration();
+ *
+ * // Override marshaller.
+ * cfg.setMarshaller(marshaller);
+ *
+ * // Starts grid.
+ * G.start(cfg);
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * PortableMarshaller can be configured from Spring XML configuration file:
+ * <pre name="code" class="xml">
+ * <bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true">
+ * ...
+ * <property name="marshaller">
+ * <bean class="org.gridgain.grid.marshaller.portable.PortableMarshaller">
+ * ...
+ * </bean>
+ * </property>
+ * ...
+ * </bean>
+ * </pre>
+ * <p>
+ * <img src="http://ignite.incubator.apache.org/images/spring-small.png">
+ * <br>
+ * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
+ */
+public class PortableMarshaller extends AbstractMarshaller {
+ /** Default portable protocol version. */
+ public static final PortableProtocolVersion DFLT_PORTABLE_PROTO_VER = PortableProtocolVersion.VER_1_4_0;
+
+ /** Class names. */
+ private Collection<String> clsNames;
+
+ /** ID mapper. */
+ private PortableIdMapper idMapper;
+
+ /** Serializer. */
+ private PortableSerializer serializer;
+
+ /** Types. */
+ private Collection<PortableTypeConfiguration> typeCfgs;
+
+ /** Use timestamp flag. */
+ private boolean useTs = true;
+
+ /** Whether to convert string to bytes using UTF-8 encoding. */
+ private boolean convertString = true;
+
+ /** Meta data enabled flag. */
+ private boolean metaDataEnabled = true;
+
+ /** Keep deserialized flag. */
+ private boolean keepDeserialized = true;
+
+ /** Protocol version. */
+ private PortableProtocolVersion protoVer = DFLT_PORTABLE_PROTO_VER;
+
+ /** */
+ private GridPortableMarshaller impl;
+
+ /**
+ * Gets class names.
+ *
+ * @return Class names.
+ */
+ public Collection<String> getClassNames() {
+ return clsNames;
+ }
+
+ /**
+ * Sets class names of portable objects explicitly.
+ *
+ * @param clsNames Class names.
+ */
+ public void setClassNames(Collection<String> clsNames) {
+ this.clsNames = new ArrayList<>(clsNames.size());
+
+ for (String clsName : clsNames)
+ this.clsNames.add(clsName.trim());
+ }
+
+ /**
+ * Gets ID mapper.
+ *
+ * @return ID mapper.
+ */
+ public PortableIdMapper getIdMapper() {
+ return idMapper;
+ }
+
+ /**
+ * Sets ID mapper.
+ *
+ * @param idMapper ID mapper.
+ */
+ public void setIdMapper(PortableIdMapper idMapper) {
+ this.idMapper = idMapper;
+ }
+
+ /**
+ * Gets serializer.
+ *
+ * @return Serializer.
+ */
+ public PortableSerializer getSerializer() {
+ return serializer;
+ }
+
+ /**
+ * Sets serializer.
+ *
+ * @param serializer Serializer.
+ */
+ public void setSerializer(PortableSerializer serializer) {
+ this.serializer = serializer;
+ }
+
+ /**
+ * Gets types configuration.
+ *
+ * @return Types configuration.
+ */
+ public Collection<PortableTypeConfiguration> getTypeConfigurations() {
+ return typeCfgs;
+ }
+
+ /**
+ * Sets type configurations.
+ *
+ * @param typeCfgs Type configurations.
+ */
+ public void setTypeConfigurations(Collection<PortableTypeConfiguration> typeCfgs) {
+ this.typeCfgs = typeCfgs;
+ }
+
+ /**
+ * If {@code true} then date values converted to {@link Timestamp} on deserialization.
+ * <p>
+ * Default value is {@code true}.
+ *
+ * @return Flag indicating whether date values converted to {@link Timestamp} during unmarshalling.
+ */
+ public boolean isUseTimestamp() {
+ return useTs;
+ }
+
+ /**
+ * @param useTs Flag indicating whether date values converted to {@link Timestamp} during unmarshalling.
+ */
+ public void setUseTimestamp(boolean useTs) {
+ this.useTs = useTs;
+ }
+
+ /**
+ * Gets strings must be converted to or from bytes using UTF-8 encoding.
+ * <p>
+ * Default value is {@code true}.
+ *
+ * @return Flag indicating whether string must be converted to byte array using UTF-8 encoding.
+ */
+ public boolean isConvertStringToBytes() {
+ return convertString;
+ }
+
+ /**
+ * Sets strings must be converted to or from bytes using UTF-8 encoding.
+ * <p>
+ * Default value is {@code true}.
+ *
+ * @param convertString Flag indicating whether string must be converted to byte array using UTF-8 encoding.
+ */
+ public void setConvertStringToBytes(boolean convertString) {
+ this.convertString = convertString;
+ }
+
+ /**
+ * If {@code true}, meta data will be collected or all types. If you need to override this behaviour for
+ * some specific type, use {@link PortableTypeConfiguration#setMetaDataEnabled(Boolean)} method.
+ * <p>
+ * Default value if {@code true}.
+ *
+ * @return Whether meta data is collected.
+ */
+ public boolean isMetaDataEnabled() {
+ return metaDataEnabled;
+ }
+
+ /**
+ * @param metaDataEnabled Whether meta data is collected.
+ */
+ public void setMetaDataEnabled(boolean metaDataEnabled) {
+ this.metaDataEnabled = metaDataEnabled;
+ }
+
+ /**
+ * If {@code true}, {@link PortableObject} will cache deserialized instance after
+ * {@link PortableObject#deserialize()} is called. All consequent calls of this
+ * method on the same instance of {@link PortableObject} will return that cached
+ * value without actually deserializing portable object. If you need to override this
+ * behaviour for some specific type, use {@link PortableTypeConfiguration#setKeepDeserialized(Boolean)}
+ * method.
+ * <p>
+ * Default value if {@code true}.
+ *
+ * @return Whether deserialized value is kept.
+ */
+ public boolean isKeepDeserialized() {
+ return keepDeserialized;
+ }
+
+ /**
+ * @param keepDeserialized Whether deserialized value is kept.
+ */
+ public void setKeepDeserialized(boolean keepDeserialized) {
+ this.keepDeserialized = keepDeserialized;
+ }
+
+ /**
+ * Gets portable protocol version.
+ * <p>
+ * Defaults to {@link #DFLT_PORTABLE_PROTO_VER}.
+ *
+ * @return Portable protocol version.
+ */
+ public PortableProtocolVersion getProtocolVersion() {
+ return protoVer;
+ }
+
+ /**
+ * Sets portable protocol version.
+ * <p>
+ * Defaults to {@link #DFLT_PORTABLE_PROTO_VER}.
+ *
+ * @param protoVer Portable protocol version.
+ */
+ public void setProtocolVersion(PortableProtocolVersion protoVer) {
+ this.protoVer = protoVer;
+ }
+
+ /**
+ * Returns currently set {@link MarshallerContext}.
+ *
+ * @return Marshaller context.
+ */
+ public MarshallerContext getContext() {
+ return ctx;
+ }
+
+ /**
+ * Sets {@link GridPortableContext}.
+ * <p/>
+ * @param ctx Portable context.
+ */
+ private void setPortableContext(GridPortableContext ctx) {
+ ctx.configure(this);
+
+ impl = new GridPortableMarshaller(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException {
+ return impl.marshal(obj, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void marshal(@Nullable Object obj, OutputStream out) throws IgniteCheckedException {
+ byte[] arr = marshal(obj);
+
+ try {
+ out.write(arr);
+ }
+ catch (IOException e) {
+ throw new PortableException("Failed to marshal the object: " + obj, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T unmarshal(byte[] bytes, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
+ return impl.deserialize(bytes, clsLdr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T unmarshal(InputStream in, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+ byte[] arr = new byte[4096];
+ int cnt;
+
+ // we have to fully read the InputStream because GridPortableMarshaller requires support of a method that
+ // returns number of bytes remaining.
+ try {
+ while ((cnt = in.read(arr)) != -1)
+ buffer.write(arr, 0, cnt);
+
+ buffer.flush();
+
+ return impl.deserialize(buffer.toByteArray(), clsLdr);
+ }
+ catch (IOException e) {
+ throw new PortableException("Failed to unmarshal the object from InputStream", e);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/marshaller/portable/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/portable/package-info.java b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/package-info.java
new file mode 100644
index 0000000..90cc5e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains portable marshaller API classes.
+ */
+package org.apache.ignite.marshaller.portable;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/portable/PortableBuilder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/portable/PortableBuilder.java b/modules/core/src/main/java/org/apache/ignite/portable/PortableBuilder.java
new file mode 100644
index 0000000..f6058cb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/portable/PortableBuilder.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.portable;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Portable object builder. Provides ability to build portable objects dynamically
+ * without having class definitions.
+ * <p>
+ * Here is an example of how a portable object can be built dynamically:
+ * <pre name=code class=java>
+ * GridPortableBuilder builder = Ignition.ignite().portables().builder("org.project.MyObject");
+ *
+ * builder.setField("fieldA", "A");
+ * builder.setField("fieldB", "B");
+ *
+ * GridPortableObject portableObj = builder.build();
+ * </pre>
+ *
+ * <p>
+ * Also builder can be initialized by existing portable object. This allows changing some fields without affecting
+ * other fields.
+ * <pre name=code class=java>
+ * GridPortableBuilder builder = Ignition.ignite().portables().builder(person);
+ *
+ * builder.setField("name", "John");
+ *
+ * person = builder.build();
+ * </pre>
+ * </p>
+ *
+ * If you need to modify nested portable object you can get builder for nested object using
+ * {@link #getField(String)}, changes made on nested builder will affect parent object,
+ * for example:
+ *
+ * <pre name=code class=java>
+ * GridPortableBuilder personBuilder = grid.portables().createBuilder(personPortableObj);
+ * GridPortableBuilder addressBuilder = personBuilder.setField("address");
+ *
+ * addressBuilder.setField("city", "New York");
+ *
+ * personPortableObj = personBuilder.build();
+ *
+ * // Should be "New York".
+ * String city = personPortableObj.getField("address").getField("city");
+ * </pre>
+ *
+ * @see GridPortables#builder(int)
+ * @see GridPortables#builder(String)
+ * @see GridPortables#builder(PortableObject)
+ */
+public interface PortableBuilder {
+ /**
+ * Returns value assigned to the specified field.
+ * If the value is a portable object instance of {@code GridPortableBuilder} will be returned,
+ * which can be modified.
+ * <p>
+ * Collections and maps returned from this method are modifiable.
+ *
+ * @param name Field name.
+ * @return Filed value.
+ */
+ public <T> T getField(String name);
+
+ /**
+ * Sets field value.
+ *
+ * @param name Field name.
+ * @param val Field value (cannot be {@code null}).
+ * @see PortableObject#metaData()
+ */
+ public PortableBuilder setField(String name, Object val);
+
+ /**
+ * Sets field value with value type specification.
+ * <p>
+ * Field type is needed for proper metadata update.
+ *
+ * @param name Field name.
+ * @param val Field value.
+ * @param type Field type.
+ * @see PortableObject#metaData()
+ */
+ public <T> PortableBuilder setField(String name, @Nullable T val, Class<? super T> type);
+
+ /**
+ * Sets field value.
+ * <p>
+ * This method should be used if field is portable object.
+ *
+ * @param name Field name.
+ * @param builder Builder for object field.
+ */
+ public PortableBuilder setField(String name, @Nullable PortableBuilder builder);
+
+ /**
+ * Removes field from this builder.
+ *
+ * @param fieldName Field name.
+ * @return {@code this} instance for chaining.
+ */
+ public PortableBuilder removeField(String fieldName);
+
+ /**
+ * Sets hash code for resulting portable object returned by {@link #build()} method.
+ * <p>
+ * If not set {@code 0} is used.
+ *
+ * @param hashCode Hash code.
+ * @return {@code this} instance for chaining.
+ */
+ public PortableBuilder hashCode(int hashCode);
+
+ /**
+ * Builds portable object.
+ *
+ * @return Portable object.
+ * @throws PortableException In case of error.
+ */
+ public PortableObject build() throws PortableException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/portable/PortableException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/portable/PortableException.java b/modules/core/src/main/java/org/apache/ignite/portable/PortableException.java
new file mode 100644
index 0000000..62ae901
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/portable/PortableException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.portable;
+
+import org.apache.ignite.*;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Exception indicating portable object serialization error.
+ */
+public class PortableException extends IgniteException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Creates portable exception with error message.
+ *
+ * @param msg Error message.
+ */
+ public PortableException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates portable exception with {@link Throwable} as a cause.
+ *
+ * @param cause Cause.
+ */
+ public PortableException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates portable exception with error message and {@link Throwable} as a cause.
+ *
+ * @param msg Error message.
+ * @param cause Cause.
+ */
+ public PortableException(String msg, @Nullable Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/portable/PortableIdMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/portable/PortableIdMapper.java b/modules/core/src/main/java/org/apache/ignite/portable/PortableIdMapper.java
new file mode 100644
index 0000000..96452cd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/portable/PortableIdMapper.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.portable;
+
+/**
+ * Type and field ID mapper for portable objects. Ignite never writes full
+ * strings for field or type names. Instead, for performance reasons, Ignite
+ * writes integer hash codes for type and field names. It has been tested that
+ * hash code conflicts for the type names or the field names
+ * within the same type are virtually non-existent and, to gain performance, it is safe
+ * to work with hash codes. For the cases when hash codes for different types or fields
+ * actually do collide {@code GridPortableIdMapper} allows to override the automatically
+ * generated hash code IDs for the type and field names.
+ * <p>
+ * Portable ID mapper can be configured for all portable objects via
+ * {@link org.apache.ignite.marshaller.portable.PortableMarshaller#getIdMapper()} method, or for a specific
+ * portable type via {@link PortableTypeConfiguration#getIdMapper()} method.
+ */
+public interface PortableIdMapper {
+ /**
+ * Gets type ID for provided class name.
+ * <p>
+ * If {@code 0} is returned, hash code of class simple name will be used.
+ *
+ * @param clsName Class name.
+ * @return Type ID.
+ */
+ public int typeId(String clsName);
+
+ /**
+ * Gets ID for provided field.
+ * <p>
+ * If {@code 0} is returned, hash code of field name will be used.
+ *
+ * @param typeId Type ID.
+ * @param fieldName Field name.
+ * @return Field ID.
+ */
+ public int fieldId(int typeId, String fieldName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/portable/PortableInvalidClassException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/portable/PortableInvalidClassException.java b/modules/core/src/main/java/org/apache/ignite/portable/PortableInvalidClassException.java
new file mode 100644
index 0000000..152c0fd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/portable/PortableInvalidClassException.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.portable;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Exception indicating that class needed for deserialization of portable object does not exist.
+ * <p>
+ * Thrown from {@link PortableObject#deserialize()} method.
+ */
+public class PortableInvalidClassException extends PortableException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Creates invalid class exception with error message.
+ *
+ * @param msg Error message.
+ */
+ public PortableInvalidClassException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates invalid class exception with {@link Throwable} as a cause.
+ *
+ * @param cause Cause.
+ */
+ public PortableInvalidClassException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates invalid class exception with error message and {@link Throwable} as a cause.
+ *
+ * @param msg Error message.
+ * @param cause Cause.
+ */
+ public PortableInvalidClassException(String msg, @Nullable Throwable cause) {
+ super(msg, cause);
+ }
+}