You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/24 20:46:37 UTC
[3/3] apex-malhar git commit: APEXMALHAR-2190 Use reusable buffer for
serialization in spillable data structures closes #404
APEXMALHAR-2190 Use reusable buffer for serialization in spillable data structures closes #404
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6ddefd02
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6ddefd02
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6ddefd02
Branch: refs/heads/master
Commit: 6ddefd02ac72168b17c3dedb47af7d91c23c3574
Parents: 3799157
Author: brightchen <br...@datatorrent.com>
Authored: Mon Aug 15 17:46:27 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Oct 24 13:43:02 2016 -0700
----------------------------------------------------------------------
.../spillable/SpillableBenchmarkApp.java | 69 +++++
.../spillable/SpillableTestInputOperator.java | 46 ++++
.../spillable/SpillableTestOperator.java | 189 ++++++++++++++
.../spillable/SpillableBenchmarkAppTester.java | 73 ++++++
.../spillable/SpillableDSBenchmarkTest.java | 171 +++++++++++++
.../state/ManagedStateBenchmarkAppTest.java | 101 ++++++++
.../state/ManagedStateBenchmarkAppTester.java | 101 --------
benchmark/src/test/resources/log4j.properties | 2 +
.../state/managed/AbstractManagedStateImpl.java | 34 ++-
.../apex/malhar/lib/state/managed/Bucket.java | 85 +++++--
.../lib/state/managed/BucketProvider.java | 40 +++
.../state/spillable/SpillableArrayListImpl.java | 17 +-
.../SpillableArrayListMultimapImpl.java | 53 ++--
.../spillable/SpillableComplexComponent.java | 29 +--
.../SpillableComplexComponentImpl.java | 64 +++--
.../lib/state/spillable/SpillableMapImpl.java | 44 ++--
.../lib/state/spillable/SpillableSetImpl.java | 45 +---
.../spillable/SpillableSetMultimapImpl.java | 45 ++--
.../state/spillable/SpillableStateStore.java | 3 +-
.../state/spillable/WindowBoundedMapCache.java | 5 +-
.../inmem/InMemSpillableStateStore.java | 26 ++
.../utils/serde/AffixKeyValueSerdeManager.java | 76 ++++++
.../apex/malhar/lib/utils/serde/AffixSerde.java | 68 +++++
.../apex/malhar/lib/utils/serde/ArraySerde.java | 97 ++++++++
.../apex/malhar/lib/utils/serde/Block.java | 217 ++++++++++++++++
.../lib/utils/serde/BlockReleaseStrategy.java | 47 ++++
.../malhar/lib/utils/serde/BlockStream.java | 179 +++++++++++++
.../malhar/lib/utils/serde/BufferSlice.java | 100 ++++++++
.../malhar/lib/utils/serde/CollectionSerde.java | 97 ++++++++
.../serde/DefaultBlockReleaseStrategy.java | 96 +++++++
.../malhar/lib/utils/serde/GenericSerde.java | 81 ++++++
.../apex/malhar/lib/utils/serde/IntSerde.java | 45 ++++
.../utils/serde/KeyValueByteStreamProvider.java | 37 +++
.../lib/utils/serde/KeyValueSerdeManager.java | 86 +++++++
.../apex/malhar/lib/utils/serde/LongSerde.java | 45 ++++
.../apex/malhar/lib/utils/serde/PairSerde.java | 73 ++++++
.../lib/utils/serde/PassThruByteArraySerde.java | 51 ----
.../serde/PassThruByteArraySliceSerde.java | 61 -----
.../lib/utils/serde/PassThruSliceSerde.java | 32 ++-
.../apex/malhar/lib/utils/serde/Serde.java | 41 +--
.../lib/utils/serde/SerdeCollectionSlice.java | 120 ---------
.../malhar/lib/utils/serde/SerdeIntSlice.java | 54 ----
.../malhar/lib/utils/serde/SerdeKryoSlice.java | 100 --------
.../malhar/lib/utils/serde/SerdeLongSlice.java | 54 ----
.../malhar/lib/utils/serde/SerdePairSlice.java | 89 -------
.../lib/utils/serde/SerdeStringSlice.java | 55 ----
.../lib/utils/serde/SerializationBuffer.java | 130 ++++++++++
.../apex/malhar/lib/utils/serde/SliceUtils.java | 10 +
.../malhar/lib/utils/serde/StringSerde.java | 45 ++++
.../lib/utils/serde/WindowCompleteListener.java | 29 +++
.../lib/utils/serde/WindowedBlockStream.java | 249 +++++++++++++++++++
.../impl/SpillableSessionWindowedStorage.java | 3 +-
.../impl/SpillableWindowedKeyedStorage.java | 28 +--
.../impl/SpillableWindowedPlainStorage.java | 18 +-
.../com/datatorrent/lib/util/TestUtils.java | 3 +-
.../lib/state/managed/DefaultBucketTest.java | 48 +++-
.../state/managed/ManagedStateTestUtils.java | 3 +-
.../spillable/SpillableArrayListImplTest.java | 12 +-
.../SpillableArrayListMultimapImplTest.java | 30 ++-
.../SpillableComplexComponentImplTest.java | 6 +-
.../state/spillable/SpillableMapImplTest.java | 39 ++-
.../state/spillable/SpillableSetImplTest.java | 4 +-
.../spillable/SpillableSetMultimapImplTest.java | 18 +-
.../lib/state/spillable/SpillableTestUtils.java | 46 ++--
.../spillable/TimeBasedPriorityQueueTest.java | 3 -
.../malhar/lib/utils/serde/AffixSerdeTest.java | 43 ++++
.../malhar/lib/utils/serde/BlockStreamTest.java | 179 +++++++++++++
.../lib/utils/serde/CollectionSerdeTest.java | 68 +++++
.../lib/utils/serde/GenericSerdeTest.java | 84 +++++++
.../malhar/lib/utils/serde/PairSerdeTest.java | 48 ++++
.../utils/serde/PassThruByteArraySerdeTest.java | 72 ------
.../utils/serde/SerdeCollectionSliceTest.java | 65 -----
.../lib/utils/serde/SerdeGeneralTest.java | 169 +++++++++++++
.../lib/utils/serde/SerdeKryoSliceTest.java | 79 ------
.../lib/utils/serde/SerdePairSliceTest.java | 44 ----
.../window/SpillableWindowedStorageTest.java | 17 +-
76 files changed, 3570 insertions(+), 1265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
new file mode 100644
index 0000000..e2fe8bb
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+@ApplicationAnnotation(name = "SpillableBenchmarkApp")
+public class SpillableBenchmarkApp implements StreamingApplication
+{
+ protected final String PROP_STORE_PATH = "dt.application.SpillableBenchmarkApp.storeBasePath";
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // Create ActiveMQStringSinglePortOutputOperator
+ SpillableTestInputOperator input = new SpillableTestInputOperator();
+ input.batchSize = 100;
+ input.sleepBetweenBatch = 0;
+ input = dag.addOperator("input", input);
+
+ SpillableTestOperator testOperator = new SpillableTestOperator();
+ testOperator.store = createStore(conf);
+ testOperator.shutdownCount = -1;
+ testOperator = dag.addOperator("test", testOperator );
+
+
+ // Connect ports
+ dag.addStream("stream", input.output, testOperator.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
+ }
+
+
+ public ManagedStateSpillableStateStore createStore(Configuration conf)
+ {
+ String basePath = getStoreBasePath(conf);
+ ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore();
+ ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+ return store;
+ }
+
+ public String getStoreBasePath(Configuration conf)
+ {
+ return Preconditions.checkNotNull(conf.get(PROP_STORE_PATH),
+ "base path should be specified in the properties.xml");
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
new file mode 100644
index 0000000..2e33721
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+public class SpillableTestInputOperator extends BaseOperator implements InputOperator
+{
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+ public long count = 0;
+ public int batchSize = 100;
+ public int sleepBetweenBatch = 1;
+
+ @Override
+ public void emitTuples()
+ {
+ for (int i = 0; i < batchSize; ++i) {
+ output.emit("" + ++count);
+ }
+ if (sleepBetweenBatch > 0) {
+ try {
+ Thread.sleep(sleepBetweenBatch);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
new file mode 100644
index 0000000..3c5bf71
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListMultimapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.LongSerde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ShutdownException;
+import com.datatorrent.common.util.BaseOperator;
+
+public class SpillableTestOperator extends BaseOperator implements Operator.CheckpointNotificationListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(SpillableTestOperator.class);
+
+ public static final byte[] ID1 = new byte[] {(byte)1};
+ public static final byte[] ID2 = new byte[] {(byte)2};
+ public static final byte[] ID3 = new byte[] {(byte)3};
+
+ public SpillableArrayListMultimapImpl<String, String> multiMap;
+
+ public ManagedStateSpillableStateStore store;
+
+ public long totalCount = 0;
+ public transient long countInWindow;
+ public long minWinId = -1;
+ public long committedWinId = -1;
+ public long windowId;
+
+ public SpillableMapImpl<Long, Long> windowToCount;
+
+ public long shutdownCount = -1;
+
+ public static Throwable errorTrace;
+
+ public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+ {
+ @Override
+ public void process(String tuple)
+ {
+ processTuple(tuple);
+ }
+ };
+
+ public void processTuple(String tuple)
+ {
+ if (++totalCount == shutdownCount) {
+ throw new RuntimeException("Test recovery. count = " + totalCount);
+ }
+ countInWindow++;
+ multiMap.put("" + windowId, tuple);
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ if (windowToCount == null) {
+ windowToCount = createWindowToCountMap(store);
+ }
+ if (multiMap == null) {
+ multiMap = createMultimap(store);
+ }
+
+ store.setup(context);
+ multiMap.setup(context);
+
+ checkData();
+ }
+
+ public void checkData()
+ {
+ long startTime = System.currentTimeMillis();
+ logger.debug("check data: totalCount: {}; minWinId: {}; committedWinId: {}; curWinId: {}", totalCount,
+ this.minWinId, committedWinId, this.windowId);
+ for (long winId = Math.max(committedWinId + 1, minWinId); winId < this.windowId; ++winId) {
+ Long count = this.windowToCount.get(winId);
+ SpillableArrayListImpl<String> datas = (SpillableArrayListImpl<String>)multiMap.get("" + winId);
+ String msg;
+ if (((datas == null && count != null) || (datas != null && count == null)) || (datas == null && count == null)) {
+ msg = "Invalid data/count. datas: " + datas + "; count: " + count;
+ logger.error(msg);
+ errorTrace = new RuntimeException(msg);
+ throw new ShutdownException();
+ } else {
+ int dataSize = datas.size();
+ if ((long)count != (long)dataSize) {
+ msg = String.format("data size not equal: window Id: %d; datas size: %d; count: %d", winId, dataSize, count);
+ logger.error(msg);
+ errorTrace = new RuntimeException(msg);
+ throw new ShutdownException();
+ }
+ }
+ }
+ logger.info("check data took {} millis.", System.currentTimeMillis() - startTime);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void beginWindow(long windowId)
+ {
+ store.beginWindow(windowId);
+ multiMap.beginWindow(windowId);
+ if (minWinId < 0) {
+ minWinId = windowId;
+ }
+
+ this.windowId = windowId;
+ countInWindow = 0;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ multiMap.endWindow();
+ windowToCount.put(windowId, countInWindow);
+ windowToCount.endWindow();
+ store.endWindow();
+
+ if (windowId % 10 == 0) {
+ checkData();
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ store.beforeCheckpoint(windowId);
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ this.committedWinId = windowId;
+ store.committed(windowId);
+ }
+
+ public static SpillableArrayListMultimapImpl<String, String> createMultimap(SpillableStateStore store)
+ {
+ return new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new StringSerde(),
+ new StringSerde());
+ }
+
+ public static SpillableMapImpl<String, String> createMap(SpillableStateStore store)
+ {
+ return new SpillableMapImpl<String, String>(store, ID2, 0L, new StringSerde(),
+ new StringSerde());
+ }
+
+ public static SpillableMapImpl<Long, Long> createWindowToCountMap(SpillableStateStore store)
+ {
+ return new SpillableMapImpl<Long, Long>(store, ID3, 0L, new LongSerde(),
+ new LongSerde());
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
new file mode 100644
index 0000000..7f94079
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp
+{
+ private static final Logger logger = LoggerFactory.getLogger(SpillableBenchmarkAppTester.class);
+ public static final String basePath = "target/temp";
+ @Test
+ public void test() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ super.populateDAG(dag, conf);
+
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
+ lma.prepareDAG(app, conf);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.run(60000);
+
+ lc.shutdown();
+
+ if (SpillableTestOperator.errorTrace != null) {
+ logger.error("Error.", SpillableTestOperator.errorTrace);
+ Assert.assertNull(SpillableTestOperator.errorTrace.getMessage(), SpillableTestOperator.errorTrace);
+ }
+ }
+
+ @Override
+ public String getStoreBasePath(Configuration conf)
+ {
+ return basePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
new file mode 100644
index 0000000..7e64c5f
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+
+public class SpillableDSBenchmarkTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(SpillableDSBenchmarkTest.class);
+ protected static final int loopCount = 100000000;
+ protected static final long oneMB = 1024 * 1024;
+ protected static final int keySize = 500000;
+ protected static final int valueSize = 100000;
+ protected static final int maxKeyLength = 100;
+ protected static final int maxValueLength = 1000;
+
+ protected static final int tuplesPerWindow = 10000;
+ protected static final int checkPointWindows = 10;
+ protected static final int commitDelays = 100;
+
+ protected final transient Random random = new Random();
+ protected String[] keys;
+ protected String[] values;
+
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+
+ @Before
+ public void setup()
+ {
+ keys = new String[keySize];
+ for (int i = 0; i < keys.length; ++i) {
+ keys[i] = this.randomString(maxKeyLength);
+ }
+
+ values = new String[valueSize];
+ for (int i = 0; i < values.length; ++i) {
+ values[i] = this.randomString(maxValueLength);
+ }
+ }
+
+ @Test
+ public void testSpillableMap()
+ {
+ byte[] ID1 = new byte[]{(byte)1};
+ ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore();
+ ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath("target/temp");
+
+ StringSerde keySerde = createKeySerde();
+ Serde<String> valueSerde = createValueSerde();
+
+ SpillableMapImpl<String, String> map = new SpillableMapImpl<String, String>(store, ID1, 0L, keySerde, valueSerde);
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ final long startTime = System.currentTimeMillis();
+
+ long windowId = 0;
+ store.beginWindow(++windowId);
+ map.beginWindow(windowId);
+
+ int outputTimes = 0;
+ for (int i = 0; i < loopCount; ++i) {
+ putEntry(map);
+
+ if (i % tuplesPerWindow == 0) {
+ map.endWindow();
+ store.endWindow();
+
+ if (i % (tuplesPerWindow * checkPointWindows) == 0) {
+ store.beforeCheckpoint(windowId);
+
+ if (windowId > commitDelays) {
+ store.committed(windowId - commitDelays);
+ }
+ }
+
+ //next window
+ store.beginWindow(++windowId);
+ map.beginWindow(windowId);
+ }
+
+ long spentTime = System.currentTimeMillis() - startTime;
+ if (spentTime > outputTimes * 5000) {
+ ++outputTimes;
+ logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}", spentTime, i, i * 1000 / spentTime);
+ checkEnvironment();
+ }
+ }
+ long spentTime = System.currentTimeMillis() - startTime;
+
+ logger.info("Spent {} mills for {} operation. average: {}", spentTime, loopCount,
+ loopCount / spentTime);
+ }
+
+
+ public void putEntry(SpillableMapImpl<String, String> map)
+ {
+ map.put(keys[random.nextInt(keys.length)], values[random.nextInt(values.length)]);
+ }
+
+ public static final String characters = "0123456789ABCDEFGHIJKLMNOPKRSTUVWXYZabcdefghijklmopqrstuvwxyz";
+
+ protected static final char[] text = new char[Math.max(maxKeyLength, maxValueLength)];
+
+ public String randomString(int length)
+ {
+ for (int i = 0; i < length; i++) {
+ text[i] = characters.charAt(random.nextInt(characters.length()));
+ }
+ return new String(text, 0, length);
+ }
+
+ public void checkEnvironment()
+ {
+ Runtime runtime = Runtime.getRuntime();
+
+ long maxMemory = runtime.maxMemory() / oneMB;
+ long allocatedMemory = runtime.totalMemory() / oneMB;
+ long freeMemory = runtime.freeMemory() / oneMB;
+
+ logger.info("freeMemory: {}M; allocatedMemory: {}M; maxMemory: {}M", freeMemory,
+ allocatedMemory, maxMemory);
+
+ Assert.assertFalse("Run out of memory.", allocatedMemory == maxMemory && freeMemory < 10);
+ }
+
+ protected StringSerde createKeySerde()
+ {
+ return new StringSerde();
+ }
+
+ protected Serde<String> createValueSerde()
+ {
+ return new StringSerde();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
new file mode 100644
index 0000000..4792843
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.state;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
+
+/**
+ * This is not a really unit test, but in fact a benchmark runner.
+ * Provides this class to give developers the convenience to run in local IDE environment.
+ *
+ */
+public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
+{
+ public static final String basePath = "target/temp";
+
+ @Before
+ public void before()
+ {
+ FileUtil.fullyDelete(new File(basePath));
+ }
+
+ @Test
+ public void testUpdateSync() throws Exception
+ {
+ test(ExecMode.UPDATESYNC);
+ }
+
+ @Test
+ public void testUpdateAsync() throws Exception
+ {
+ test(ExecMode.UPDATEASYNC);
+ }
+
+ @Test
+ public void testInsert() throws Exception
+ {
+ test(ExecMode.INSERT);
+ }
+
+ public void test(ExecMode exeMode) throws Exception
+ {
+ Configuration conf = new Configuration(false);
+
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ super.populateDAG(dag, conf);
+ storeOperator.execMode = exeMode;
+
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
+ lma.prepareDAG(app, conf);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.run(300000);
+
+ lc.shutdown();
+ }
+
+
+
+ @Override
+ public String getStoreBasePath(Configuration conf)
+ {
+ return basePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
deleted file mode 100644
index 4435aad..0000000
--- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.benchmark.state;
-
-import java.io.File;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
-
-/**
- * This is not a really unit test, but in fact a benchmark runner.
- * Provides this class to give developers the convenience to run in local IDE environment.
- *
- */
-public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
-{
- public static final String basePath = "target/temp";
-
- @Before
- public void before()
- {
- FileUtil.fullyDelete(new File(basePath));
- }
-
- @Test
- public void testUpdateSync() throws Exception
- {
- test(ExecMode.UPDATESYNC);
- }
-
- @Test
- public void testUpdateAsync() throws Exception
- {
- test(ExecMode.UPDATEASYNC);
- }
-
- @Test
- public void testInsert() throws Exception
- {
- test(ExecMode.INSERT);
- }
-
- public void test(ExecMode exeMode) throws Exception
- {
- Configuration conf = new Configuration(false);
-
- LocalMode lma = LocalMode.newInstance();
- DAG dag = lma.getDAG();
-
- super.populateDAG(dag, conf);
- storeOperator.execMode = exeMode;
-
- StreamingApplication app = new StreamingApplication()
- {
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- }
- };
-
- lma.prepareDAG(app, conf);
-
- // Create local cluster
- final LocalMode.Controller lc = lma.getController();
- lc.run(300000);
-
- lc.shutdown();
- }
-
-
-
- @Override
- public String getStoreBasePath(Configuration conf)
- {
- return basePath;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/benchmark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties
index cf0d19e..3fc0120 100644
--- a/benchmark/src/test/resources/log4j.properties
+++ b/benchmark/src/test/resources/log4j.properties
@@ -41,3 +41,5 @@ log4j.logger.org=info
#log4j.logger.org.apache.commons.beanutils=warn
log4j.logger.com.datatorrent=debug
log4j.logger.org.apache.apex=debug
+log4j.logger.org.apache.apex.malhar.lib.state.managed=info
+log4j.logger.com.datatorrent.common.util.FSStorageAgent=info
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index dd2bbab..20271b0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -125,7 +125,7 @@ import com.datatorrent.netlet.util.Slice;
*/
public abstract class AbstractManagedStateImpl
implements ManagedState, Component<OperatorContext>, Operator.CheckpointNotificationListener, ManagedStateContext,
- TimeBucketAssigner.PurgeListener
+ TimeBucketAssigner.PurgeListener, BucketProvider
{
private long maxMemorySize;
@@ -319,11 +319,24 @@ public abstract class AbstractManagedStateImpl
return (int)(bucketId % numBuckets);
}
- Bucket getBucket(long bucketId)
+ @Override
+ public Bucket getBucket(long bucketId)
{
return buckets[getBucketIdx(bucketId)];
}
+ @Override
+ public Bucket ensureBucket(long bucketId)
+ {
+ Bucket b = getBucket(bucketId);
+ if (b == null) {
+ b = newBucket(bucketId);
+ b.setup(this);
+ buckets[getBucketIdx(bucketId)] = b;
+ }
+ return b;
+ }
+
protected Bucket newBucket(long bucketId)
{
return new Bucket.DefaultBucket(bucketId);
@@ -384,6 +397,22 @@ public abstract class AbstractManagedStateImpl
}
}
+ /**
+ * get the memory usage for each bucket
+ * @return The map of bucket id to memory size used by the bucket
+ */
+ public Map<Long, Long> getBucketMemoryUsage()
+ {
+ Map<Long, Long> bucketToSize = Maps.newHashMap();
+ for (Bucket bucket : buckets) {
+ if (bucket == null) {
+ continue;
+ }
+ bucketToSize.put(bucket.getBucketId(), bucket.getKeyStream().size() + bucket.getValueStream().size());
+ }
+ return bucketToSize;
+ }
+
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public void teardown()
@@ -476,6 +505,7 @@ public abstract class AbstractManagedStateImpl
this.keyComparator = Preconditions.checkNotNull(keyComparator);
}
+ @Override
public BucketsFileSystem getBucketsFileSystem()
{
return bucketsFileSystem;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 4fc2327..cbc4e03 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -32,6 +33,10 @@ import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.utils.serde.KeyValueByteStreamProvider;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -46,7 +51,7 @@ import com.datatorrent.netlet.util.Slice;
*
* @since 3.4.0
*/
-public interface Bucket extends ManagedStateComponent
+public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvider
{
/**
* @return bucket id
@@ -218,13 +223,22 @@ public interface Bucket extends ManagedStateComponent
private transient TreeMap<Long, BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
+ /**
+ * By default, separate keys and values into two different streams.
+ * key stream and value stream should be created during construction instead of setup, as the reference of the streams will be passed to the serialize method
+ */
+ protected WindowedBlockStream keyStream = new WindowedBlockStream();
+ protected WindowedBlockStream valueStream = new WindowedBlockStream();
+
+ protected ConcurrentLinkedQueue<Long> windowsForFreeMemory = new ConcurrentLinkedQueue<>();
+
private DefaultBucket()
{
//for kryo
bucketId = -1;
}
- protected DefaultBucket(long bucketId)
+ public DefaultBucket(long bucketId)
{
this.bucketId = bucketId;
}
@@ -321,6 +335,9 @@ public interface Bucket extends ManagedStateComponent
@Override
public Slice get(Slice key, long timeBucket, ReadSource readSource)
{
+ // This call is lightweight
+ releaseMemory();
+ key = SliceUtils.toBufferSlice(key);
switch (readSource) {
case MEMORY:
return getFromMemory(key);
@@ -392,6 +409,11 @@ public interface Bucket extends ManagedStateComponent
@Override
public void put(Slice key, long timeBucket, Slice value)
{
+ // This call is lightweight
+ releaseMemory();
+ key = SliceUtils.toBufferSlice(key);
+ value = SliceUtils.toBufferSlice(value);
+
BucketedValue bucketedValue = flash.get(key);
if (bucketedValue == null) {
bucketedValue = new BucketedValue(timeBucket, value);
@@ -409,39 +431,45 @@ public interface Bucket extends ManagedStateComponent
}
}
+ /**
+ * Free memory up to the given windowId
+ * This method will be called by another thread. Adding concurrency control to Stream would impact the performance.
+ * This method only calculates the size of the memory that could be released and then sends free memory request to the operator thread
+ */
@Override
public long freeMemory(long windowId) throws IOException
{
- long memoryFreed = 0;
- Long clearWindowId;
-
- while ((clearWindowId = committedData.floorKey(windowId)) != null) {
- Map<Slice, BucketedValue> windowData = committedData.remove(clearWindowId);
+ // calculate the size first and then send the release memory request. It could reduce the chance of conflict and increase the performance.
+ long size = keyStream.dataSizeUpToWindow(windowId) + valueStream.dataSizeUpToWindow(windowId);
+ windowsForFreeMemory.add(windowId);
+ return size;
+ }
- for (Map.Entry<Slice, BucketedValue> entry: windowData.entrySet()) {
- memoryFreed += entry.getKey().length + entry.getValue().getSize();
- }
+ /**
+ * This operation must be called from operator thread. It won't do anything if no memory to be freed
+ */
+ protected long releaseMemory()
+ {
+ long memoryFreed = 0;
+ while (!windowsForFreeMemory.isEmpty()) {
+ long windowId = windowsForFreeMemory.poll();
+ long originSize = keyStream.size() + valueStream.size();
+ keyStream.completeWindow(windowId);
+ valueStream.completeWindow(windowId);
+ memoryFreed += originSize - (keyStream.size() + valueStream.size());
}
- fileCache.clear();
- if (cachedBucketMetas != null) {
-
- for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) {
- FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
- if (reader != null) {
- memoryFreed += tbm.getSizeInBytes();
- reader.close();
- }
- }
+ if (memoryFreed > 0) {
+ LOG.debug("Total freed memory size: {}", memoryFreed);
+ sizeInBytes.getAndAdd(-memoryFreed);
}
- sizeInBytes.getAndAdd(-memoryFreed);
- LOG.debug("space freed {} {}", bucketId, memoryFreed);
return memoryFreed;
}
@Override
public Map<Slice, BucketedValue> checkpoint(long windowId)
{
+ releaseMemory();
try {
//transferring the data from flash to check-pointed state in finally block and re-initializing the flash.
return flash;
@@ -548,6 +576,19 @@ public interface Bucket extends ManagedStateComponent
return checkpointedData;
}
+
+ @Override
+ public WindowedBlockStream getKeyStream()
+ {
+ return keyStream;
+ }
+
+ @Override
+ public WindowedBlockStream getValueStream()
+ {
+ return valueStream;
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
new file mode 100644
index 0000000..bbd18ac
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+/**
+ * This interface declares methods to get bucket by bucket id
+ *
+ */
+public interface BucketProvider
+{
+ /**
+ * get bucket by bucket id
+ * @param bucketId
+ * @return
+ */
+ public Bucket getBucket(long bucketId);
+
+ /**
+ * Create bucket if not exist, return the bucket
+ * @param bucketId
+ * @return
+ */
+ public Bucket ensureBucket(long bucketId);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
index a59872c..d0ca9ff 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
@@ -26,9 +26,9 @@ import java.util.ListIterator;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.utils.serde.CollectionSerde;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
@@ -37,7 +37,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
/**
* A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
@@ -58,11 +57,10 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
@NotNull
private SpillableStateStore store;
@NotNull
- private Serde<T, Slice> serde;
+ private Serde<T> serde;
@NotNull
private SpillableMapImpl<Integer, List<T>> map;
- private boolean sizeCached = false;
private int size;
private int numBatches;
@@ -86,15 +84,15 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
*/
public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
@NotNull SpillableStateStore store,
- @NotNull Serde<T, Slice> serde)
+ @NotNull Serde<T> serde)
{
this.bucketId = bucketId;
this.prefix = Preconditions.checkNotNull(prefix);
this.store = Preconditions.checkNotNull(store);
this.serde = Preconditions.checkNotNull(serde);
- map = new SpillableMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(),
- new SerdeCollectionSlice<>(serde, (Class<List<T>>)(Class)ArrayList.class));
+ map = new SpillableMapImpl<>(store, prefix, bucketId, new IntSerde(),
+ new CollectionSerde<T, List<T>>(serde, (Class)ArrayList.class));
}
/**
@@ -111,7 +109,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
*/
public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
@NotNull SpillableStateStore store,
- @NotNull Serde<T, Slice> serde,
+ @NotNull Serde<T> serde,
int batchSize)
{
this(bucketId, prefix, store, serde);
@@ -328,6 +326,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
@Override
public void setup(Context.OperatorContext context)
{
+ store.ensureBucket(bucketId);
map.setup(context);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
index 0944583..d3340ce 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
@@ -26,10 +26,10 @@ import java.util.Set;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
@@ -62,10 +62,11 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
@NotNull
private SpillableMapImpl<Slice, Integer> map;
private SpillableStateStore store;
- private byte[] identifier;
private long bucket;
- private Serde<K, Slice> serdeKey;
- private Serde<V, Slice> serdeValue;
+ private Serde<V> valueSerde;
+
+ protected transient Context.OperatorContext context;
+ protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
private SpillableArrayListMultimapImpl()
{
@@ -78,20 +79,20 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
* @param identifier The Id of this {@link SpillableArrayListMultimapImpl}.
* @param bucket The Id of the bucket used to store this
* {@link SpillableArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+ * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+ * @param valueSerde The {@link Serde} to use when serializing and deserializing values.
*/
public SpillableArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ Serde<K> keySerde,
+ Serde<V> valueSerde)
{
this.store = Preconditions.checkNotNull(store);
- this.identifier = Preconditions.checkNotNull(identifier);
this.bucket = bucket;
- this.serdeKey = Preconditions.checkNotNull(serdeKey);
- this.serdeValue = Preconditions.checkNotNull(serdeValue);
+ this.valueSerde = Preconditions.checkNotNull(valueSerde);
+
+ keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(SIZE_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
- map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice());
+ map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new IntSerde());
}
public SpillableStateStore getStore()
@@ -110,15 +111,12 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
if (spillableArrayList == null) {
- Slice keySlice = serdeKey.serialize(key);
- Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX));
-
+ Integer size = map.get(keyValueSerdeManager.serializeMetaKey(key, false));
if (size == null) {
return null;
}
- Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
- spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+ spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, false).toByteArray(), store, valueSerde);
spillableArrayList.setSize(size);
}
@@ -179,8 +177,7 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
@Override
public boolean containsKey(@Nullable Object key)
{
- return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
- SIZE_KEY_SUFFIX));
+ return cache.contains((K)key) || map.containsKey(keyValueSerdeManager.serializeMetaKey((K)key, false));
}
@Override
@@ -217,9 +214,9 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
if (spillableArrayList == null) {
- Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
- spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
-
+ Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, true);
+ spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, valueSerde);
+ spillableArrayList.setup(context);
cache.put(key, spillableArrayList);
}
@@ -272,14 +269,19 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
@Override
public void setup(Context.OperatorContext context)
{
+ this.context = context;
+
map.setup(context);
isRunning = true;
+
+ keyValueSerdeManager.setup(store, bucket);
}
@Override
public void beginWindow(long windowId)
{
map.beginWindow(windowId);
+ keyValueSerdeManager.beginWindow(windowId);
isInWindow = true;
}
@@ -292,13 +294,14 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
spillableArrayList.endWindow();
- Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX),
- spillableArrayList.size());
+ map.put(keyValueSerdeManager.serializeMetaKey(key, true), spillableArrayList.size());
}
Preconditions.checkState(cache.getRemovedKeys().isEmpty());
cache.endWindow();
map.endWindow();
+
+ keyValueSerdeManager.resetReadBuffer();
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index c4462d5..542a914 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -24,7 +24,6 @@ import org.apache.apex.malhar.lib.utils.serde.Serde;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator;
-import com.datatorrent.netlet.util.Slice;
/**
* This is a composite component containing spillable data structures. This should be used as
@@ -43,7 +42,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
* @return A {@link SpillableList}.
*/
- <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde);
+ <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde);
/**
* This is a method for creating a {@link SpillableList}.
@@ -53,7 +52,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
* @return A {@link SpillableList}.
*/
- <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde);
+ <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde);
/**
* This is a method for creating a {@link SpillableMap}. This method
@@ -65,8 +64,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serdeValue The Serializer/Deserializer to use for the map's values.
* @return A {@link SpillableMap}.
*/
- <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue);
+ <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey,
+ Serde<V> serdeValue);
/**
* This is a method for creating a {@link SpillableMap}.
@@ -79,7 +78,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @return A {@link SpillableMap}.
*/
<K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue);
+ Serde<K> serdeKey, Serde<V> serdeValue);
/**
* This is a method for creating a {@link SpillableListMultimap}. This method
@@ -91,8 +90,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
* @return A {@link SpillableListMultimap}.
*/
- <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
- Slice> serdeKey, Serde<V, Slice> serdeValue);
+ <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue);
/**
* This is a method for creating a {@link SpillableListMultimap}.
@@ -105,8 +103,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @return A {@link SpillableListMultimap}.
*/
<K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue);
+ Serde<K> serdeKey,
+ Serde<V> serdeValue);
/**
* This is a method for creating a {@link SpillableSetMultimap}.
@@ -117,8 +115,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
* @return A {@link SpillableSetMultimap}.
*/
- <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
- Slice> serdeKey, Serde<V, Slice> serdeValue);
+ <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue);
/**
* This is a method for creating a {@link SpillableMultiset}. This method
@@ -128,7 +125,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
* @return A {@link SpillableMultiset}.
*/
- <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde);
+ <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde);
/**
* This is a method for creating a {@link SpillableMultiset}.
@@ -138,7 +135,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
* @return A {@link SpillableMultiset}.
*/
- <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde);
+ <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T> serde);
/**
* This is a method for creating a {@link SpillableQueue}. This method
@@ -148,7 +145,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
* @return A {@link SpillableQueue}.
*/
- <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde);
+ <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T> serde);
/**
* This is a method for creating a {@link SpillableQueue}.
@@ -158,5 +155,5 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
* @return A {@link SpillableQueue}.
*/
- <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde);
+ <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index aad219d..1a3f550 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -19,6 +19,7 @@
package org.apache.apex.malhar.lib.state.spillable;
import java.util.List;
+import java.util.Set;
import javax.validation.constraints.NotNull;
@@ -27,9 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
/**
* This is a factory that is used for Spillable datastructures. This component is used by nesting it inside of an
@@ -50,6 +51,11 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
@NotNull
private SpillableIdentifierGenerator identifierGenerator;
+ /**
+ * need to make sure all the buckets are created during setup.
+ */
+ protected transient Set<Long> bucketIds = Sets.newHashSet();
+
private SpillableComplexComponentImpl()
{
// for kryo
@@ -66,84 +72,99 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator);
}
- public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde)
{
SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde);
componentList.add(list);
return list;
}
- public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde)
{
identifierGenerator.register(identifier);
SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde);
+ bucketIds.add(bucket);
componentList.add(list);
return list;
}
- public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ @Override
+ public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey,
+ Serde<V> serdeValue)
{
SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(),
bucket, serdeKey, serdeValue);
+ bucketIds.add(bucket);
componentList.add(map);
return map;
}
- public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ @Override
+ public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K> serdeKey,
+ Serde<V> serdeValue)
{
identifierGenerator.register(identifier);
SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
+ bucketIds.add(bucket);
componentList.add(map);
return map;
}
- public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
- Slice> serdeKey, Serde<V, Slice> serdeValue)
+ @Override
+ public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
{
SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
identifierGenerator.next(), bucket, serdeKey, serdeValue);
+ bucketIds.add(bucket);
componentList.add(map);
return map;
}
+ @Override
public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ Serde<K> serdeKey,
+ Serde<V> serdeValue)
{
identifierGenerator.register(identifier);
SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
identifier, bucket, serdeKey, serdeValue);
+ bucketIds.add(bucket);
componentList.add(map);
return map;
}
- public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
- Slice> serdeKey, Serde<V, Slice> serdeValue)
+ @Override
+ public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
{
SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store,
identifierGenerator.next(), bucket, serdeKey, serdeValue);
+ bucketIds.add(bucket);
componentList.add(map);
return map;
}
- public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
}
- public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
}
- public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
}
- public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
}
@@ -152,6 +173,15 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
public void setup(Context.OperatorContext context)
{
store.setup(context);
+
+ //ensure buckets created.
+ for (long bucketId : bucketIds) {
+ store.ensureBucket(bucketId);
+ }
+
+ //the bucket ids are only for setup. We don't need bucket ids during run time.
+ bucketIds.clear();
+
for (SpillableComponent spillableComponent: componentList) {
spillableComponent.setup(context);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
index 016aeec..5fa39d7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -26,13 +26,13 @@ import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.google.common.base.Preconditions;
@@ -51,21 +51,20 @@ import com.datatorrent.netlet.util.Slice;
public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spillable.SpillableComponent,
Serializable
{
+ private static final long serialVersionUID = 4552547110215784584L;
private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
- private transient MutableInt tempOffset = new MutableInt();
+ private transient Input tmpInput = new Input();
@NotNull
private SpillableStateStore store;
@NotNull
private byte[] identifier;
private long bucket;
- @NotNull
- private Serde<K, Slice> serdeKey;
- @NotNull
- private Serde<V, Slice> serdeValue;
private int size = 0;
+ protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
+
private SpillableMapImpl()
{
//for kryo
@@ -77,17 +76,16 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
* @param identifier The Id of this {@link SpillableMapImpl}.
* @param bucket The Id of the bucket used to store this
* {@link SpillableMapImpl} in the provided {@link SpillableStateStore}.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+ * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+ * @param keySerde The {@link Serde} to use when serializing and deserializing values.
*/
- public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> keySerde,
+ Serde<V> valueSerde)
{
this.store = Preconditions.checkNotNull(store);
this.identifier = Preconditions.checkNotNull(identifier);
this.bucket = bucket;
- this.serdeKey = Preconditions.checkNotNull(serdeKey);
- this.serdeValue = Preconditions.checkNotNull(serdeValue);
+ keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(null, identifier, Preconditions.checkNotNull(keySerde), Preconditions.checkNotNull(valueSerde));
}
public SpillableStateStore getStore()
@@ -134,16 +132,17 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
return val;
}
- Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+ Slice valSlice = store.getSync(bucket, keyValueSerdeManager.serializeDataKey(key, false));
if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
return null;
}
- tempOffset.setValue(0);
- return serdeValue.deserialize(valSlice, tempOffset);
+ tmpInput.setBuffer(valSlice.buffer, valSlice.offset, valSlice.length);
+ return keyValueSerdeManager.deserializeValue(tmpInput);
}
+
@Override
public V put(K k, V v)
{
@@ -207,6 +206,8 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
@Override
public void setup(Context.OperatorContext context)
{
+ store.ensureBucket(bucket);
+ keyValueSerdeManager.setup(store, bucket);
}
@Override
@@ -218,16 +219,15 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
public void endWindow()
{
for (K key: cache.getChangedKeys()) {
- store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
- serdeValue.serialize(cache.get(key)));
+ store.put(bucket, keyValueSerdeManager.serializeDataKey(key, true),
+ keyValueSerdeManager.serializeValue(cache.get(key)));
}
for (K key: cache.getRemovedKeys()) {
- store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
- new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
+ store.put(this.bucket, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
}
-
cache.endWindow();
+ keyValueSerdeManager.resetReadBuffer();
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
index c2741b0..0dfc411 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -26,15 +26,15 @@ import java.util.NoSuchElementException;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.google.common.base.Preconditions;
import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
/**
* A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
@@ -62,49 +62,30 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
T next;
}
- public static class SerdeListNodeSlice<T> implements Serde<ListNode<T>, Slice>
+ public static class ListNodeSerde<T> implements Serde<ListNode<T>>
{
- private Serde<T, Slice> serde;
- private static Slice falseSlice = new Slice(new byte[]{0});
- private static Slice trueSlice = new Slice(new byte[]{1});
+ private Serde<T> serde;
- public SerdeListNodeSlice(@NotNull Serde<T, Slice> serde)
+ public ListNodeSerde(@NotNull Serde<T> serde)
{
this.serde = Preconditions.checkNotNull(serde);
}
@Override
- public Slice serialize(ListNode<T> object)
+ public void serialize(ListNode<T> object, Output output)
{
- int size = 0;
-
- Slice slice1 = object.valid ? trueSlice : falseSlice;
- size += 1;
- Slice slice2 = serde.serialize(object.next);
- size += slice2.length;
-
- byte[] bytes = new byte[size];
- System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
- System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
-
- return new Slice(bytes);
+ output.writeBoolean(object.valid);
+ serde.serialize(object.next, output);
}
@Override
- public ListNode<T> deserialize(Slice slice, MutableInt offset)
+ public ListNode<T> deserialize(Input input)
{
ListNode<T> result = new ListNode<>();
- result.valid = slice.buffer[offset.intValue()] != 0;
- offset.add(1);
- result.next = serde.deserialize(slice, offset);
+ result.valid = input.readBoolean();
+ result.next = serde.deserialize(input);
return result;
}
-
- @Override
- public ListNode<T> deserialize(Slice object)
- {
- return deserialize(object, new MutableInt(0));
- }
}
@NotNull
@@ -135,11 +116,11 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
*/
public SpillableSetImpl(long bucketId, @NotNull byte[] prefix,
@NotNull SpillableStateStore store,
- @NotNull Serde<T, Slice> serde)
+ @NotNull Serde<T> serde)
{
this.store = Preconditions.checkNotNull(store);
- map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde));
+ map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new ListNodeSerde(serde));
}
public void setSize(int size)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index 98f60d2..76e47f2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -27,11 +27,11 @@ import java.util.Set;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
+import org.apache.apex.malhar.lib.utils.serde.PairSerde;
import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdePairSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
@@ -65,10 +65,11 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
private SpillableStateStore store;
private byte[] identifier;
private long bucket;
- private Serde<K, Slice> serdeKey;
- private Serde<V, Slice> serdeValue;
+ private Serde<V> valueSerde;
private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>();
+ protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
+ protected transient Context.OperatorContext context;
private SpillableSetMultimapImpl()
{
// for kryo
@@ -84,16 +85,15 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
* @param serdeKey The {@link Serde} to use when serializing and deserializing values.
*/
public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ Serde<K> keySerde,
+ Serde<V> valueSerde)
{
this.store = Preconditions.checkNotNull(store);
- this.identifier = Preconditions.checkNotNull(identifier);
this.bucket = bucket;
- this.serdeKey = Preconditions.checkNotNull(serdeKey);
- this.serdeValue = Preconditions.checkNotNull(serdeValue);
+ this.valueSerde = Preconditions.checkNotNull(valueSerde);
+ keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
- map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
+ map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new PairSerde<>(new IntSerde(), valueSerde));
}
public SpillableStateStore getStore()
@@ -112,17 +112,17 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = cache.get(key);
if (spillableSet == null) {
- Slice keySlice = serdeKey.serialize(key);
- Pair<Integer, V> meta = map.get(SliceUtils.concatenate(keySlice, META_KEY_SUFFIX));
+ Pair<Integer, V> meta = map.get(keyValueSerdeManager.serializeMetaKey(key, false));
if (meta == null) {
return null;
}
- Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
- spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+ Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false);
+ spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde);
spillableSet.setSize(meta.getLeft());
spillableSet.setHead(meta.getRight());
+ spillableSet.setup(context);
}
cache.put(key, spillableSet);
@@ -166,7 +166,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = getHelper((K)key);
if (spillableSet != null) {
cache.remove((K)key);
- Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+ Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead()));
spillableSet.clear();
removedSets.add(spillableSet);
@@ -199,7 +199,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
if (cache.contains((K)key)) {
return true;
}
- Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+ Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
Pair<Integer, V> meta = map.get(keySlice);
return meta != null && meta.getLeft() > 0;
}
@@ -227,8 +227,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = getHelper(key);
if (spillableSet == null) {
- Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
- spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+ spillableSet = new SpillableSetImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde);
+ spillableSet.setup(context);
cache.put(key, spillableSet);
}
return spillableSet.add(value);
@@ -284,13 +284,16 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
@Override
public void setup(Context.OperatorContext context)
{
+ this.context = context;
map.setup(context);
+ keyValueSerdeManager.setup(store, bucket);
}
@Override
public void beginWindow(long windowId)
{
map.beginWindow(windowId);
+ keyValueSerdeManager.beginWindow(windowId);
}
@Override
@@ -301,7 +304,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = cache.get(key);
spillableSet.endWindow();
- map.put(SliceUtils.concatenate(serdeKey.serialize(key), META_KEY_SUFFIX),
+ map.put(keyValueSerdeManager.serializeMetaKey(key, true),
new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
}
@@ -311,6 +314,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
cache.endWindow();
map.endWindow();
+
+ keyValueSerdeManager.resetReadBuffer();
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
index b6ee3c0..44f003b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
@@ -19,6 +19,7 @@
package org.apache.apex.malhar.lib.state.spillable;
import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.BucketProvider;
import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.api.Component;
@@ -32,6 +33,6 @@ import com.datatorrent.api.Operator;
*/
@InterfaceStability.Evolving
public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
- Operator.CheckpointNotificationListener, WindowListener
+ Operator.CheckpointNotificationListener, WindowListener, BucketProvider
{
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
index 0e1d55e..e80d38d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
@@ -21,6 +21,9 @@ package org.apache.apex.malhar.lib.state.spillable;
import java.util.Map;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
@@ -39,6 +42,7 @@ import com.google.common.collect.Sets;
@InterfaceStability.Evolving
public class WindowBoundedMapCache<K, V>
{
+ private static final transient Logger logger = LoggerFactory.getLogger(WindowBoundedMapCache.class);
public static final int DEFAULT_MAX_SIZE = 50000;
private int maxSize = DEFAULT_MAX_SIZE;
@@ -109,7 +113,6 @@ public class WindowBoundedMapCache<K, V>
Note: beginWindow is intentionally not implemented because many users need a cache that does not require
beginWindow to be called.
*/
-
public void endWindow()
{
int count = cache.size() - maxSize;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
index 61ab8a8..8acb044 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
@@ -23,7 +23,10 @@ import java.util.concurrent.Future;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.state.managed.Bucket;
import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.collect.Maps;
@@ -74,6 +77,8 @@ public class InMemSpillableStateStore implements SpillableStateStore
bucket = Maps.newHashMap();
store.put(bucketId, bucket);
}
+ key = SliceUtils.toBufferSlice(key);
+ value = SliceUtils.toBufferSlice(value);
bucket.put(key, value);
}
@@ -88,6 +93,10 @@ public class InMemSpillableStateStore implements SpillableStateStore
store.put(bucketId, bucket);
}
+ if (key.getClass() == Slice.class) {
+ //The hashCode of Slice was not correct, so correct it
+ key = new BufferSlice(key);
+ }
return bucket.get(key);
}
@@ -117,4 +126,21 @@ public class InMemSpillableStateStore implements SpillableStateStore
{
return store.toString();
}
+
+ protected Bucket.DefaultBucket bucket;
+
+ @Override
+ public Bucket getBucket(long bucketId)
+ {
+ return bucket;
+ }
+
+ @Override
+ public Bucket ensureBucket(long bucketId)
+ {
+ if (bucket == null) {
+ bucket = new Bucket.DefaultBucket(1);
+ }
+ return bucket;
+ }
}