You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/16 09:51:06 UTC
[07/12] ignite git commit: IGNITE-1282: Refactoring.
IGNITE-1282: Refactoring.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0fa8992b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0fa8992b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0fa8992b
Branch: refs/heads/ignite-1917
Commit: 0fa8992b08b867548bfabaebd5659c1f0bcac773
Parents: 2a60909
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Nov 16 09:27:22 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Nov 16 09:27:22 2015 +0300
----------------------------------------------------------------------
.../internal/portable/BinaryWriterExImpl.java | 9 +-
.../streams/PortableHeapOutputStream.java | 4 +-
.../streams/PortableMemoryAllocator.java | 102 +---------------
.../streams/PortableMemoryAllocatorChunk.java | 115 +++++++++++++++++++
4 files changed, 127 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0fa8992b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryWriterExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryWriterExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryWriterExImpl.java
index f4cb1e6..ffa89b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryWriterExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryWriterExImpl.java
@@ -24,6 +24,7 @@ import org.apache.ignite.binary.BinaryRawWriter;
import org.apache.ignite.binary.BinaryWriter;
import org.apache.ignite.internal.portable.streams.PortableHeapOutputStream;
import org.apache.ignite.internal.portable.streams.PortableMemoryAllocator;
+import org.apache.ignite.internal.portable.streams.PortableMemoryAllocatorChunk;
import org.apache.ignite.internal.portable.streams.PortableOutputStream;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.jetbrains.annotations.Nullable;
@@ -152,7 +153,8 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
};
private static class TLSContext {
- public PortableMemoryAllocator.Chunk chunk = PortableMemoryAllocator.INSTANCE.chunk();
+
+ public PortableMemoryAllocatorChunk chunk = PortableMemoryAllocator.INSTANCE.chunk();
public SchemaHolder schema = new SchemaHolder();
}
@@ -408,11 +410,8 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
* Pop schema.
*/
public void popSchema() {
- if (schema != null) {
- assert fieldCnt > 0;
-
+ if (fieldCnt > 0)
schema.pop(fieldCnt);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0fa8992b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
index 49c5744..51d6c85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
@@ -22,7 +22,7 @@ package org.apache.ignite.internal.portable.streams;
*/
public final class PortableHeapOutputStream extends PortableAbstractOutputStream {
/** Allocator. */
- private final PortableMemoryAllocator.Chunk chunk;
+ private final PortableMemoryAllocatorChunk chunk;
/** Data. */
private byte[] data;
@@ -42,7 +42,7 @@ public final class PortableHeapOutputStream extends PortableAbstractOutputStream
* @param cap Capacity.
* @param chunk Chunk.
*/
- public PortableHeapOutputStream(int cap, PortableMemoryAllocator.Chunk chunk) {
+ public PortableHeapOutputStream(int cap, PortableMemoryAllocatorChunk chunk) {
this.chunk = chunk;
data = chunk.allocate(cap);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0fa8992b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
index e4e0dbf..e16747b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
@@ -17,12 +17,6 @@
package org.apache.ignite.internal.portable.streams;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import sun.misc.Unsafe;
-
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-
/**
* Thread-local memory allocator.
*/
@@ -31,13 +25,7 @@ public final class PortableMemoryAllocator {
public static final PortableMemoryAllocator INSTANCE = new PortableMemoryAllocator();
/** Holders. */
- private static final ThreadLocal<Chunk> holders = new ThreadLocal<>();
-
- /** Unsafe instance. */
- protected static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
- /** Array offset: byte. */
- protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+ private static final ThreadLocal<PortableMemoryAllocatorChunk> holders = new ThreadLocal<>();
/**
* Ensures singleton.
@@ -46,11 +34,11 @@ public final class PortableMemoryAllocator {
// No-op.
}
- public Chunk chunk() {
- Chunk holder = holders.get();
+ public PortableMemoryAllocatorChunk chunk() {
+ PortableMemoryAllocatorChunk holder = holders.get();
if (holder == null)
- holders.set(holder = new Chunk());
+ holders.set(holder = new PortableMemoryAllocatorChunk());
return holder;
}
@@ -62,86 +50,8 @@ public final class PortableMemoryAllocator {
* @return {@code true} if acquired {@code false} otherwise.
*/
public boolean isAcquired() {
- Chunk holder = holders.get();
-
- return holder != null && holder.acquired;
- }
-
- /**
- * Thread-local byte array holder.
- */
- public static class Chunk {
- /** */
- private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, 10000);
-
- /** Data array */
- private byte[] data;
-
- /** Max message size detected between checks. */
- private int maxMsgSize;
-
- /** Last time array size is checked. */
- private long lastCheck = U.currentTimeMillis();
-
- /** Whether the holder is acquired or not. */
- private boolean acquired;
-
- /**
- * Allocate.
- *
- * @param size Desired size.
- * @return Data.
- */
- public byte[] allocate(int size) {
- if (acquired)
- return new byte[size];
-
- acquired = true;
-
- if (data == null || size > data.length)
- data = new byte[size];
-
- return data;
- }
-
- /**
- * Reallocate.
- *
- * @param data Old data.
- * @param size Size.
- * @return New data.
- */
- public byte[] reallocate(byte[] data, int size) {
- byte[] newData = new byte[size];
-
- if (this.data == data)
- this.data = newData;
-
- UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
-
- return newData;
- }
-
- /**
- * Shrinks array size if needed.
- */
- public void release(byte[] data, int maxMsgSize) {
- if (this.data != data)
- return;
-
- this.maxMsgSize = maxMsgSize;
- this.acquired = false;
-
- long now = U.currentTimeMillis();
-
- if (now - this.lastCheck >= CHECK_FREQ) {
- int halfSize = data.length >> 1;
-
- if (this.maxMsgSize < halfSize)
- this.data = new byte[halfSize];
+ PortableMemoryAllocatorChunk holder = holders.get();
- this.lastCheck = now;
- }
- }
+ return holder != null && holder.isAcquired();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/0fa8992b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java
new file mode 100644
index 0000000..48848e0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import sun.misc.Unsafe;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
+
+/**
+ * Memory allocator chunk.
+ */
+public class PortableMemoryAllocatorChunk {
+ /** Unsafe instance. */
+ protected static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** Array offset: byte. */
+ protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** Buffer size re-check frequency. */
+ private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, 10000);
+
+ /** Data array */
+ private byte[] data;
+
+ /** Max message size detected between checks. */
+ private int maxMsgSize;
+
+ /** Last time array size is checked. */
+ private long lastCheck = U.currentTimeMillis();
+
+ /** Whether the holder is acquired or not. */
+ private boolean acquired;
+
+ /**
+ * Allocate.
+ *
+ * @param size Desired size.
+ * @return Data.
+ */
+ public byte[] allocate(int size) {
+ if (acquired)
+ return new byte[size];
+
+ acquired = true;
+
+ if (data == null || size > data.length)
+ data = new byte[size];
+
+ return data;
+ }
+
+ /**
+ * Reallocate.
+ *
+ * @param data Old data.
+ * @param size Size.
+ * @return New data.
+ */
+ public byte[] reallocate(byte[] data, int size) {
+ byte[] newData = new byte[size];
+
+ if (this.data == data)
+ this.data = newData;
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
+
+ return newData;
+ }
+
+ /**
+ * Shrinks array size if needed.
+ */
+ public void release(byte[] data, int maxMsgSize) {
+ if (this.data != data)
+ return;
+
+ this.maxMsgSize = maxMsgSize;
+ this.acquired = false;
+
+ long now = U.currentTimeMillis();
+
+ if (now - this.lastCheck >= CHECK_FREQ) {
+ int halfSize = data.length >> 1;
+
+ if (this.maxMsgSize < halfSize)
+ this.data = new byte[halfSize];
+
+ this.lastCheck = now;
+ }
+ }
+
+ /**
+ * @return {@code True} if acquired.
+ */
+ public boolean isAcquired() {
+ return acquired;
+ }
+}