You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/20 23:50:23 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2247 #resolve Added
iteration feature in SpillableArrayListImpl and generalize SerdeListSlice to
SerdeCollectionSlice
Repository: apex-malhar
Updated Branches:
refs/heads/master e0081143f -> 0a924ada9
APEXMALHAR-2247 #resolve Added iteration feature in SpillableArrayListImpl and generalize SerdeListSlice to SerdeCollectionSlice
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/fd20718a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/fd20718a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/fd20718a
Branch: refs/heads/master
Commit: fd20718a7e7fd832eb0cce59b6d5ef9d3b388e40
Parents: 9f9da0e
Author: David Yan <da...@datatorrent.com>
Authored: Mon Sep 19 17:53:21 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Mon Sep 19 17:53:21 2016 -0700
----------------------------------------------------------------------
.../state/spillable/SpillableArrayListImpl.java | 29 ++++-
.../lib/utils/serde/SerdeCollectionSlice.java | 120 +++++++++++++++++++
.../malhar/lib/utils/serde/SerdeListSlice.java | 111 -----------------
.../lib/state/spillable/SpillableTestUtils.java | 14 ++-
.../utils/serde/SerdeCollectionSliceTest.java | 65 ++++++++++
.../lib/utils/serde/SerdeListSliceTest.java | 45 -------
6 files changed, 219 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
index da5b140..4ea1923 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.apex.malhar.lib.state.spillable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -26,8 +27,8 @@ import java.util.ListIterator;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
@@ -92,7 +93,8 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T
this.store = Preconditions.checkNotNull(store);
this.serde = Preconditions.checkNotNull(serde);
- map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), new SerdeListSlice(serde));
+ map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(),
+ new SerdeCollectionSlice<>(serde, (Class<List<T>>)(Class)ArrayList.class));
}
/**
@@ -145,7 +147,28 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T
@Override
public Iterator<T> iterator()
{
- throw new UnsupportedOperationException();
+ return new Iterator<T>()
+ {
+ private int index = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return index < size;
+ }
+
+ @Override
+ public T next()
+ {
+ return get(index++);
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
new file mode 100644
index 0000000..eca1d5f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes lists.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class SerdeCollectionSlice<T, CollectionT extends Collection<T>> implements Serde<CollectionT, Slice>
+{
+ @NotNull
+ private Serde<T, Slice> serde;
+
+ @NotNull
+ private Class<? extends CollectionT> collectionClass;
+
+ private SerdeCollectionSlice()
+ {
+ // for Kryo
+ }
+
+ /**
+ * Creates a {@link SerdeCollectionSlice}.
+ * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
+ */
+ public SerdeCollectionSlice(@NotNull Serde<T, Slice> serde, @NotNull Class<? extends CollectionT> collectionClass)
+ {
+ this.serde = Preconditions.checkNotNull(serde);
+ this.collectionClass = Preconditions.checkNotNull(collectionClass);
+ }
+
+ @Override
+ public Slice serialize(CollectionT objects)
+ {
+ Slice[] slices = new Slice[objects.size()];
+
+ int size = 4;
+
+ int index = 0;
+ for (T object : objects) {
+ Slice slice = serde.serialize(object);
+ slices[index++] = slice;
+ size += slice.length;
+ }
+
+ byte[] bytes = new byte[size];
+ int offset = 0;
+
+ byte[] sizeBytes = GPOUtils.serializeInt(objects.size());
+ System.arraycopy(sizeBytes, 0, bytes, offset, 4);
+ offset += 4;
+
+ for (index = 0; index < slices.length; index++) {
+ Slice slice = slices[index];
+ System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length);
+ offset += slice.length;
+ }
+
+ return new Slice(bytes);
+ }
+
+ @Override
+ public CollectionT deserialize(Slice slice, MutableInt offset)
+ {
+ MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue());
+
+ int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset);
+ sliceOffset.subtract(slice.offset);
+ try {
+ CollectionT collection = collectionClass.newInstance();
+
+ for (int index = 0; index < numElements; index++) {
+ T object = serde.deserialize(slice, sliceOffset);
+ collection.add(object);
+ }
+
+ offset.setValue(sliceOffset.intValue());
+ return collection;
+ } catch (Exception ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ @Override
+ public CollectionT deserialize(Slice slice)
+ {
+ return deserialize(slice, new MutableInt(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
deleted file mode 100644
index 68d11c8..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.List;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which serializes and deserializes lists.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeListSlice<T> implements Serde<List<T>, Slice>
-{
- @NotNull
- private Serde<T, Slice> serde;
-
- private SerdeListSlice()
- {
- // for Kryo
- }
-
- /**
- * Creates a {@link SerdeListSlice}.
- * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
- */
- public SerdeListSlice(@NotNull Serde<T, Slice> serde)
- {
- this.serde = Preconditions.checkNotNull(serde);
- }
-
- @Override
- public Slice serialize(List<T> objects)
- {
- Slice[] slices = new Slice[objects.size()];
-
- int size = 4;
-
- for (int index = 0; index < objects.size(); index++) {
- Slice slice = serde.serialize(objects.get(index));
- slices[index] = slice;
- size += slice.length;
- }
-
- byte[] bytes = new byte[size];
- int offset = 0;
-
- byte[] sizeBytes = GPOUtils.serializeInt(objects.size());
- System.arraycopy(sizeBytes, 0, bytes, offset, 4);
- offset += 4;
-
- for (int index = 0; index < slices.length; index++) {
- Slice slice = slices[index];
- System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length);
- offset += slice.length;
- }
-
- return new Slice(bytes);
- }
-
- @Override
- public List<T> deserialize(Slice slice, MutableInt offset)
- {
- MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue());
-
- int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset);
- List<T> list = Lists.newArrayListWithCapacity(numElements);
- sliceOffset.subtract(slice.offset);
-
- for (int index = 0; index < numElements; index++) {
- T object = serde.deserialize(slice, sliceOffset);
- list.add(object);
- }
-
- offset.setValue(sliceOffset.intValue());
- return list;
- }
-
- @Override
- public List<T> deserialize(Slice slice)
- {
- return deserialize(slice, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
index 00ea58d..36e3557 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.apex.malhar.lib.state.spillable;
+import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
@@ -27,7 +28,7 @@ import org.junit.runner.Description;
import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils;
import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.commons.lang3.mutable.MutableInt;
@@ -44,18 +45,19 @@ import com.datatorrent.netlet.util.Slice;
public class SpillableTestUtils
{
public static SerdeStringSlice SERDE_STRING_SLICE = new SerdeStringSlice();
- public static SerdeListSlice<String> SERDE_STRING_LIST_SLICE = new SerdeListSlice(new SerdeStringSlice());
+ public static SerdeCollectionSlice<String, List<String>> SERDE_STRING_LIST_SLICE = new SerdeCollectionSlice<>(new SerdeStringSlice(),
+ (Class<List<String>>)(Class)ArrayList.class);
private SpillableTestUtils()
{
//Shouldn't instantiate this
}
- static class TestMeta extends TestWatcher
+ public static class TestMeta extends TestWatcher
{
- ManagedStateSpillableStateStore store;
- Context.OperatorContext operatorContext;
- String applicationPath;
+ public ManagedStateSpillableStateStore store;
+ public Context.OperatorContext operatorContext;
+ public String applicationPath;
@Override
protected void starting(Description description)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
new file mode 100644
index 0000000..f6085f6
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerdeCollectionSliceTest
+{
+ @Test
+ public void testSerdeList()
+ {
+ SerdeCollectionSlice<String, List<String>> serdeList =
+ new SerdeCollectionSlice<>(new SerdeStringSlice(), (Class<List<String>>)(Class)ArrayList.class);
+
+ List<String> stringList = Lists.newArrayList("a", "b", "c");
+
+ Slice slice = serdeList.serialize(stringList);
+
+ List<String> deserializedList = serdeList.deserialize(slice);
+
+ Assert.assertEquals(stringList, deserializedList);
+ }
+
+ @Test
+ public void testSerdeSet()
+ {
+ SerdeCollectionSlice<String, Set<String>> serdeSet =
+ new SerdeCollectionSlice<>(new SerdeStringSlice(), (Class<Set<String>>)(Class)HashSet.class);
+
+ Set<String> stringList = Sets.newHashSet("a", "b", "c");
+
+ Slice slice = serdeSet.serialize(stringList);
+
+ Set<String> deserializedSet = serdeSet.deserialize(slice);
+
+ Assert.assertEquals(stringList, deserializedSet);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java
deleted file mode 100644
index f7753d2..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.netlet.util.Slice;
-
-public class SerdeListSliceTest
-{
- @Test
- public void simpleSerdeTest()
- {
- SerdeListSlice<String> serdeList = new SerdeListSlice<String>(new SerdeStringSlice());
-
- List<String> stringList = Lists.newArrayList("a", "b", "c");
-
- Slice slice = serdeList.serialize(stringList);
-
- List<String> deserializedList = serdeList.deserialize(slice);
-
- Assert.assertEquals(stringList, deserializedList);
- }
-}
[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2247' of
https://github.com/davidyan74/incubator-apex-malhar
Posted by hs...@apache.org.
Merge branch 'APEXMALHAR-2247' of https://github.com/davidyan74/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0a924ada
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0a924ada
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0a924ada
Branch: refs/heads/master
Commit: 0a924ada949bd7a9f362a24ccf12de9e85db6d22
Parents: e008114 fd20718
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Sep 20 16:49:33 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Sep 20 16:49:33 2016 -0700
----------------------------------------------------------------------
.../state/spillable/SpillableArrayListImpl.java | 29 ++++-
.../lib/utils/serde/SerdeCollectionSlice.java | 120 +++++++++++++++++++
.../malhar/lib/utils/serde/SerdeListSlice.java | 111 -----------------
.../lib/state/spillable/SpillableTestUtils.java | 14 ++-
.../utils/serde/SerdeCollectionSliceTest.java | 65 ++++++++++
.../lib/utils/serde/SerdeListSliceTest.java | 45 -------
6 files changed, 219 insertions(+), 165 deletions(-)
----------------------------------------------------------------------