You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/19 19:31:19 UTC
[1/7] samza git commit: SAMZA-853 : Use Optional class from guava
instead of elasticsearch
Repository: samza
Updated Branches:
refs/heads/samza-sql fb4f12ed3 -> c50096291
SAMZA-853 : Use Optional class from guava instead of elasticsearch
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/73a93955
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/73a93955
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/73a93955
Branch: refs/heads/samza-sql
Commit: 73a939554c04a8725549fa8d954632d8ebdb2905
Parents: fc303e5
Author: Vadim Chekan <k....@gmail.com>
Authored: Wed Feb 3 14:48:27 2016 -0800
Committer: Navina <na...@gmail.com>
Committed: Wed Feb 3 14:48:27 2016 -0800
----------------------------------------------------------------------
build.gradle | 1 +
.../src/main/java/org/apache/samza/config/ElasticsearchConfig.java | 2 +-
.../elasticsearch/indexrequest/DefaultIndexRequestFactory.java | 2 +-
3 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/73a93955/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0541def..16facbb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -199,6 +199,7 @@ project(':samza-elasticsearch') {
compile project(':samza-api')
compile project(":samza-core_$scalaVersion")
compile "org.elasticsearch:elasticsearch:$elasticsearchVersion"
+ compile "com.google.guava:guava:$guavaVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/73a93955/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
index 6091feb..75bf4c7 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
@@ -20,7 +20,7 @@
package org.apache.samza.config;
import org.apache.samza.SamzaException;
-import org.elasticsearch.common.base.Optional;
+import com.google.common.base.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/73a93955/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
index ddac22f..7f1e884 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
@@ -23,7 +23,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.base.Optional;
+import com.google.common.base.Optional;
import org.elasticsearch.index.VersionType;
import java.util.Map;
[5/7] samza git commit: SAMZA-851: update Hello-Samza w/ CDH tutorial
documentation
Posted by ni...@apache.org.
SAMZA-851: update Hello-Samza w/ CDH tutorial documentation
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/21764303
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/21764303
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/21764303
Branch: refs/heads/samza-sql
Commit: 2176430379aa6355a27148d1384fbc342b96f24a
Parents: 495f2eb
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Feb 9 21:18:38 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Feb 11 14:34:32 2016 -0800
----------------------------------------------------------------------
docs/learn/tutorials/versioned/deploy-samza-to-CDH.md | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/21764303/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md b/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
index 36df300..daf762b 100644
--- a/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
+++ b/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
@@ -19,7 +19,15 @@ title: Deploy Samza Job To CDH
limitations under the License.
-->
-The tutorial assumes you have successfully run [hello-samza](../../../startup/hello-samza/{{site.version}}/) and now you want to deploy the job to your Cloudera Data Hub ([CDH](http://www.cloudera.com/content/cloudera/en/products-and-services/cdh.html)). This tutorial is based on CDH 5.0.0 and uses hello-samza as the example job.
+The tutorial assumes you have successfully run [hello-samza](../../../startup/hello-samza/{{site.version}}/) and now you want to deploy the job to your Cloudera Data Hub ([CDH](http://www.cloudera.com/content/cloudera/en/products-and-services/cdh.html)). This tutorial is based on CDH 5.4.0 and uses hello-samza as the example job.
+
+### Compile Package for CDH 5.4.0
+
+We need to use a specific compile option to build hello-samza package for CDH 5.4.0
+
+{% highlight bash %}
+mvn clean package -Denv=cdh5.4.0
+{% endhighlight %}
### Upload Package to Cluster
[6/7] samza git commit: SAMZA-836: Fix rocksdb unit test failure
Posted by ni...@apache.org.
SAMZA-836: Fix rocksdb unit test failure
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4bcf88f5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4bcf88f5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4bcf88f5
Branch: refs/heads/samza-sql
Commit: 4bcf88f5e38ed18ff485a29fa6a472a922c0900d
Parents: 2176430
Author: Yi Pan <ni...@gmail.com>
Authored: Fri Feb 12 17:29:22 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Feb 12 17:29:22 2016 -0800
----------------------------------------------------------------------
.../samza/storage/kv/TestRocksDbKeyValueStore.scala | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4bcf88f5/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index 0c86a5a..b7f1cdc 100644
--- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -23,14 +23,10 @@ package org.apache.samza.storage.kv
import java.io.File
import java.util
-import org.apache.samza.{SamzaException, Partition}
import org.apache.samza.config.MapConfig
-import org.apache.samza.container.{TaskName, SamzaContainerContext}
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{NoOpMetricsRegistry, ExponentialSleepStrategy}
-import org.apache.samza.util.Util._
+import org.apache.samza.util.ExponentialSleepStrategy
import org.junit.{Assert, Test}
-import org.rocksdb.{RocksDB, FlushOptions, RocksDBException, Options}
+import org.rocksdb.{RocksDB, FlushOptions, Options}
class TestRocksDbKeyValueStore
{
@@ -70,7 +66,6 @@ class TestRocksDbKeyValueStore
def testFlush(): Unit = {
val map = new util.HashMap[String, String]()
val config = new MapConfig(map)
- val flushOptions = new FlushOptions().setWaitForFlush(true)
val options = new Options()
options.setCreateIfMissing(true)
val rocksDB = RocksDbKeyValueStore.openDB(new File(System.getProperty("java.io.tmpdir")),
@@ -80,6 +75,9 @@ class TestRocksDbKeyValueStore
"dbStore")
val key = "key".getBytes("UTF-8")
rocksDB.put(key, "val".getBytes("UTF-8"))
+ // SAMZA-836: Mysteriously,calling new FlushOptions() does not invoke the NativeLibraryLoader in rocksdbjni-3.13.1!
+ // Moving this line after calling new Options() resolve the issue.
+ val flushOptions = new FlushOptions().setWaitForFlush(true)
rocksDB.flush(flushOptions)
val dbDir = new File(System.getProperty("java.io.tmpdir")).toString
val rocksDBReadOnly = RocksDB.openReadOnly(options, dbDir)
[3/7] samza git commit: SAMZA-861: move KV-store public APIs to
samza-api module
Posted by ni...@apache.org.
SAMZA-861: move KV-store public APIs to samza-api module
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7c23e24f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7c23e24f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7c23e24f
Branch: refs/heads/samza-sql
Commit: 7c23e24f21b2e5d9aa2d5f2c66aea885d5ac4674
Parents: 6a5d49b
Author: Boris Shkolik <bo...@apache.org>
Authored: Mon Feb 8 11:10:18 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Feb 8 11:10:18 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/samza/storage/kv/Entry.java | 46 ++++++
.../samza/storage/kv/KeyValueIterator.java | 26 +++
.../apache/samza/storage/kv/KeyValueStore.java | 163 +++++++++++++++++++
.../java/org/apache/samza/storage/kv/Entry.java | 46 ------
.../samza/storage/kv/KeyValueIterator.java | 26 ---
.../apache/samza/storage/kv/KeyValueStore.java | 163 -------------------
6 files changed, 235 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-api/src/main/java/org/apache/samza/storage/kv/Entry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/Entry.java b/samza-api/src/main/java/org/apache/samza/storage/kv/Entry.java
new file mode 100644
index 0000000..00e9f12
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/Entry.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+/**
+ * A key and value.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public class Entry<K, V> {
+
+ private final K key;
+ private final V value;
+
+ public Entry(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
new file mode 100644
index 0000000..854ebbf
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.Iterator;
+
+public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>> {
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
new file mode 100644
index 0000000..b1fea7b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A key-value store that supports put, get, delete, and range queries.
+ *
+ * @param <K> the type of keys maintained by this key-value store.
+ * @param <V> the type of values maintained by this key-value store.
+ */
+public interface KeyValueStore<K, V> {
+ /**
+ * Gets the value associated with the specified {@code key}.
+ *
+ * @param key the key with which the associated value is to be fetched.
+ * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
+ * @throws NullPointerException if the specified {@code key} is {@code null}.
+ */
+ V get(K key);
+
+ /**
+ * Gets the values with which the specified {@code keys} are associated.
+ *
+ * @param keys the keys with which the associated values are to be fetched.
+ * @return a map of the keys that were found and their respective values.
+ * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+ */
+ Map<K, V> getAll(List<K> keys);
+
+ /**
+ * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
+ *
+ * @param key the key with which the specified {@code value} is to be associated.
+ * @param value the value with which the specified {@code key} is to be associated.
+ * @throws NullPointerException if the specified {@code key} or {@code value} is {@code null}.
+ */
+ void put(K key, V value);
+
+ /**
+ * Updates the mappings of the specified key-value {@code entries}.
+ *
+ * @param entries the updated mappings to put into this key-value store.
+ * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key or value.
+ */
+ void putAll(List<Entry<K, V>> entries);
+
+ /**
+ * Deletes the mapping for the specified {@code key} from this key-value store (if such mapping exists).
+ *
+ * @param key the key for which the mapping is to be deleted.
+ * @throws NullPointerException if the specified {@code key} is {@code null}.
+ */
+ void delete(K key);
+
+ /**
+ * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
+ *
+ * @param keys the keys for which the mappings are to be deleted.
+ * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+ */
+ void deleteAll(List<K> keys);
+
+ /**
+ * Returns an iterator for a sorted range of entries specified by [{@code from}, {@code to}).
+ *
+ * <p><b>API Note:</b> The returned iterator MUST be closed after use. The comparator used for finding entries that belong to the specified
+ * range compares the underlying serialized big-endian byte array representation of keys, lexicographically.
+ * @see <a href="http://en.wikipedia.org/wiki/Lexicographical_order">Lexicographical order article at Wikipedia</a></p>
+ * @param from the key specifying the low endpoint (inclusive) of the keys in the returned range.
+ * @param to the key specifying the high endpoint (exclusive) of the keys in the returned range.
+ * @return an iterator for the specified key range.
+ * @throws NullPointerException if null is used for {@code from} or {@code to}.
+ */
+ KeyValueIterator<K, V> range(K from, K to);
+
+ /**
+ * Returns an iterator for all entries in this key-value store.
+ *
+ * <p><b>API Note:</b> The returned iterator MUST be closed after use.</p>
+ * @return an iterator for all entries in this key-value store.
+ */
+ KeyValueIterator<K, V> all();
+
+ /**
+ * Closes this key-value store, if applicable, relinquishing any underlying resources.
+ */
+ void close();
+
+ /**
+ * Flushes this key-value store, if applicable.
+ */
+ void flush();
+
+ /**
+ * Represents an extension for classes that implement {@link KeyValueStore}.
+ */
+ // TODO replace with default interface methods when we can use Java 8 features.
+ class Extension {
+ private Extension() {
+ // This class cannot be instantiated
+ }
+
+ /**
+ * Gets the values with which the specified {@code keys} are associated.
+ *
+ * @param store the key-value store for which this operation is to be performed.
+ * @param keys the keys with which the associated values are to be fetched.
+ * @param <K> the type of keys maintained by the specified {@code store}.
+ * @param <V> the type of values maintained by the specified {@code store}.
+ * @return a map of the keys that were found and their respective values.
+ * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+ */
+ public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, final List<K> keys) {
+ final Map<K, V> map = new HashMap<>(keys.size());
+
+ for (final K key : keys) {
+ final V value = store.get(key);
+
+ if (value != null) {
+ map.put(key, value);
+ }
+ }
+
+ return map;
+ }
+
+ /**
+ * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
+ *
+ * @param store the key-value store for which this operation is to be performed.
+ * @param keys the keys for which the mappings are to be deleted.
+ * @param <K> the type of keys maintained by the specified {@code store}.
+ * @param <V> the type of values maintained by the specified {@code store}.
+ * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+ */
+ public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final List<K> keys) {
+ for (final K key : keys) {
+ store.delete(key);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-kv/src/main/java/org/apache/samza/storage/kv/Entry.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/Entry.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/Entry.java
deleted file mode 100644
index 00e9f12..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/Entry.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-/**
- * A key and value.
- *
- * @param <K> The key type
- * @param <V> The value type
- */
-public class Entry<K, V> {
-
- private final K key;
- private final V value;
-
- public Entry(K key, V value) {
- this.key = key;
- this.value = value;
- }
-
- public K getKey() {
- return key;
- }
-
- public V getValue() {
- return value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
deleted file mode 100644
index 854ebbf..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import java.util.Iterator;
-
-public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>> {
- public void close();
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
deleted file mode 100644
index b1fea7b..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A key-value store that supports put, get, delete, and range queries.
- *
- * @param <K> the type of keys maintained by this key-value store.
- * @param <V> the type of values maintained by this key-value store.
- */
-public interface KeyValueStore<K, V> {
- /**
- * Gets the value associated with the specified {@code key}.
- *
- * @param key the key with which the associated value is to be fetched.
- * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
- * @throws NullPointerException if the specified {@code key} is {@code null}.
- */
- V get(K key);
-
- /**
- * Gets the values with which the specified {@code keys} are associated.
- *
- * @param keys the keys with which the associated values are to be fetched.
- * @return a map of the keys that were found and their respective values.
- * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
- */
- Map<K, V> getAll(List<K> keys);
-
- /**
- * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
- *
- * @param key the key with which the specified {@code value} is to be associated.
- * @param value the value with which the specified {@code key} is to be associated.
- * @throws NullPointerException if the specified {@code key} or {@code value} is {@code null}.
- */
- void put(K key, V value);
-
- /**
- * Updates the mappings of the specified key-value {@code entries}.
- *
- * @param entries the updated mappings to put into this key-value store.
- * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key or value.
- */
- void putAll(List<Entry<K, V>> entries);
-
- /**
- * Deletes the mapping for the specified {@code key} from this key-value store (if such mapping exists).
- *
- * @param key the key for which the mapping is to be deleted.
- * @throws NullPointerException if the specified {@code key} is {@code null}.
- */
- void delete(K key);
-
- /**
- * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
- *
- * @param keys the keys for which the mappings are to be deleted.
- * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
- */
- void deleteAll(List<K> keys);
-
- /**
- * Returns an iterator for a sorted range of entries specified by [{@code from}, {@code to}).
- *
- * <p><b>API Note:</b> The returned iterator MUST be closed after use. The comparator used for finding entries that belong to the specified
- * range compares the underlying serialized big-endian byte array representation of keys, lexicographically.
- * @see <a href="http://en.wikipedia.org/wiki/Lexicographical_order">Lexicographical order article at Wikipedia</a></p>
- * @param from the key specifying the low endpoint (inclusive) of the keys in the returned range.
- * @param to the key specifying the high endpoint (exclusive) of the keys in the returned range.
- * @return an iterator for the specified key range.
- * @throws NullPointerException if null is used for {@code from} or {@code to}.
- */
- KeyValueIterator<K, V> range(K from, K to);
-
- /**
- * Returns an iterator for all entries in this key-value store.
- *
- * <p><b>API Note:</b> The returned iterator MUST be closed after use.</p>
- * @return an iterator for all entries in this key-value store.
- */
- KeyValueIterator<K, V> all();
-
- /**
- * Closes this key-value store, if applicable, relinquishing any underlying resources.
- */
- void close();
-
- /**
- * Flushes this key-value store, if applicable.
- */
- void flush();
-
- /**
- * Represents an extension for classes that implement {@link KeyValueStore}.
- */
- // TODO replace with default interface methods when we can use Java 8 features.
- class Extension {
- private Extension() {
- // This class cannot be instantiated
- }
-
- /**
- * Gets the values with which the specified {@code keys} are associated.
- *
- * @param store the key-value store for which this operation is to be performed.
- * @param keys the keys with which the associated values are to be fetched.
- * @param <K> the type of keys maintained by the specified {@code store}.
- * @param <V> the type of values maintained by the specified {@code store}.
- * @return a map of the keys that were found and their respective values.
- * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
- */
- public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, final List<K> keys) {
- final Map<K, V> map = new HashMap<>(keys.size());
-
- for (final K key : keys) {
- final V value = store.get(key);
-
- if (value != null) {
- map.put(key, value);
- }
- }
-
- return map;
- }
-
- /**
- * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
- *
- * @param store the key-value store for which this operation is to be performed.
- * @param keys the keys for which the mappings are to be deleted.
- * @param <K> the type of keys maintained by the specified {@code store}.
- * @param <V> the type of values maintained by the specified {@code store}.
- * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
- */
- public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final List<K> keys) {
- for (final K key : keys) {
- store.delete(key);
- }
- }
- }
-}
[7/7] samza git commit: Merge branch 'master' into samza-sql
Posted by ni...@apache.org.
Merge branch 'master' into samza-sql
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c5009629
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c5009629
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c5009629
Branch: refs/heads/samza-sql
Commit: c500962912041a70553cb39cf02d6faba23c6b60
Parents: fb4f12e 4bcf88f
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Wed Feb 17 09:49:30 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Feb 17 09:49:30 2016 -0800
----------------------------------------------------------------------
build.gradle | 1 +
.../tutorials/versioned/deploy-samza-to-CDH.md | 10 +-
.../java/org/apache/samza/storage/kv/Entry.java | 46 ++++++
.../samza/storage/kv/KeyValueIterator.java | 26 +++
.../apache/samza/storage/kv/KeyValueStore.java | 163 +++++++++++++++++++
.../samza/config/ElasticsearchConfig.java | 2 +-
.../DefaultIndexRequestFactory.java | 2 +-
.../samza/storage/kv/RocksDbKeyValueStore.scala | 2 +-
.../storage/kv/TestRocksDbKeyValueStore.scala | 12 +-
.../java/org/apache/samza/storage/kv/Entry.java | 46 ------
.../samza/storage/kv/KeyValueIterator.java | 26 ---
.../apache/samza/storage/kv/KeyValueStore.java | 163 -------------------
.../job/yarn/AbstractContainerAllocator.java | 33 +++-
.../samza/job/yarn/ContainerAllocator.java | 40 ++---
.../samza/job/yarn/ContainerRequestState.java | 58 +++----
.../job/yarn/HostAwareContainerAllocator.java | 87 ++++------
16 files changed, 363 insertions(+), 354 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c5009629/build.gradle
----------------------------------------------------------------------
[4/7] samza git commit: SAMZA-866: Refactor container allocator
classes
Posted by ni...@apache.org.
SAMZA-866: Refactor container allocator classes
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/495f2eb8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/495f2eb8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/495f2eb8
Branch: refs/heads/samza-sql
Commit: 495f2eb8e66d0729f7d5b66cdcff47eda32ef8a6
Parents: 7c23e24
Author: Jacob Maes <ja...@gmail.com>
Authored: Tue Feb 9 23:30:21 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Feb 9 23:30:21 2016 -0800
----------------------------------------------------------------------
.../job/yarn/AbstractContainerAllocator.java | 33 +++++++-
.../samza/job/yarn/ContainerAllocator.java | 40 ++++-----
.../samza/job/yarn/ContainerRequestState.java | 58 ++++++-------
.../job/yarn/HostAwareContainerAllocator.java | 87 ++++++++------------
4 files changed, 110 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
index 9ee2dac..2e192ee 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.samza.config.YarnConfig;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This class is responsible for making requests for containers to the AM and also, assigning a container to run on an allocated resource.
@@ -34,6 +37,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
* See {@link org.apache.samza.job.yarn.ContainerAllocator} and {@link org.apache.samza.job.yarn.HostAwareContainerAllocator}
*/
public abstract class AbstractContainerAllocator implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(AbstractContainerAllocator.class);
+
public static final String ANY_HOST = ContainerRequestState.ANY_HOST;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_CONTAINER_MEM = 1024;
@@ -45,9 +50,6 @@ public abstract class AbstractContainerAllocator implements Runnable {
protected final int containerMaxMemoryMb;
protected final int containerMaxCpuCore;
- @Override
- public abstract void run();
-
// containerRequestState indicate the state of all unfulfilled container requests and allocated containers
protected final ContainerRequestState containerRequestState;
@@ -66,6 +68,31 @@ public abstract class AbstractContainerAllocator implements Runnable {
this.containerMaxCpuCore = yarnConfig.getContainerMaxCpuCores();
}
+ /**
+ * Continuously assigns requested containers to the allocated containers provided by the cluster manager.
+ * The loop frequency is governed by thread sleeps for ALLOCATOR_SLEEP_TIME ms.
+ *
+ * Terminates when the isRunning flag is cleared.
+ */
+ @Override
+ public void run() {
+ while(isRunning.get()) {
+ try {
+ assignContainerRequests();
+ Thread.sleep(ALLOCATOR_SLEEP_TIME);
+ } catch (InterruptedException e) {
+ log.info("Got InterruptedException in AllocatorThread.", e);
+ } catch (Exception e) {
+ log.error("Got unknown Exception in AllocatorThread.", e);
+ }
+ }
+ }
+
+ /**
+ * Assigns the container requests from the queue to the allocated containers from the cluster manager and
+ * runs them.
+ */
+ protected abstract void assignContainerRequests();
/**
* Called during initial request for containers
http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
index 7c57a86..31fcc57 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
@@ -18,13 +18,13 @@
*/
package org.apache.samza.job.yarn;
+import java.util.List;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.samza.config.YarnConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
/**
* This is the default allocator thread that will be used by SamzaTaskManager.
@@ -42,36 +42,28 @@ public class ContainerAllocator extends AbstractContainerAllocator {
}
/**
- * During the run() method, the thread sleeps for ALLOCATOR_SLEEP_TIME ms. It tries to allocate any unsatisfied
- * request that is still in the request queue (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
+ * This method tries to allocate any unsatisfied request that is still in the request queue
+ * (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
* with allocated containers, if any.
*
* Since host-affinity is not enabled, all allocated container resources are buffered in the list keyed by "ANY_HOST".
* */
@Override
- public void run() {
- while(isRunning.get()) {
- try {
- List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
- while (!containerRequestState.getRequestsQueue().isEmpty() && allocatedContainers != null && allocatedContainers.size() > 0) {
- SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
- Container container = allocatedContainers.get(0);
-
- // Update state
- containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
+ public void assignContainerRequests() {
+ List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
+ while (!containerRequestState.getRequestsQueue().isEmpty() && allocatedContainers != null && allocatedContainers.size() > 0) {
+ SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
+ Container container = allocatedContainers.get(0);
- // Cancel request and run container
- log.info("Running {} on {}", request.expectedContainerId, container.getId());
- containerUtil.runContainer(request.expectedContainerId, container);
- }
+ // Update state
+ containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
- // If requestQueue is empty, all extra containers in the buffer should be released.
- containerRequestState.releaseExtraContainers();
-
- Thread.sleep(ALLOCATOR_SLEEP_TIME);
- } catch (InterruptedException e) {
- log.info("Got InterruptedException in AllocatorThread. Pending Container request(s) cannot be fulfilled!!", e);
- }
+ // Cancel request and run container
+ log.info("Running {} on {}", request.expectedContainerId, container.getId());
+ containerUtil.runContainer(request.expectedContainerId, container);
}
+
+ // If requestQueue is empty, all extra containers in the buffer should be released.
+ containerRequestState.releaseExtraContainers();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
index ab3061e..54db5e5 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
@@ -18,18 +18,17 @@
*/
package org.apache.samza.job.yarn;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class maintains the state variables for all the container requests and the allocated containers returned
@@ -205,35 +204,35 @@ public class ContainerRequestState {
public synchronized int releaseExtraContainers() {
int numReleasedContainers = 0;
- if (hostAffinityEnabled) {
- if (requestsQueue.isEmpty()) {
- log.debug("Container Requests Queue is empty.");
+ if (requestsQueue.isEmpty()) {
+ log.debug("Container Requests Queue is empty.");
+ if (hostAffinityEnabled) {
List<String> allocatedHosts = getAllocatedHosts();
for (String host : allocatedHosts) {
- List<Container> containers = getContainersOnAHost(host);
- if (containers != null) {
- for (Container c : containers) {
- log.info("Releasing extra container {} allocated on {}", c.getId(), host);
- amClient.releaseAssignedContainer(c.getId());
- numReleasedContainers++;
- }
- }
+ numReleasedContainers += releaseContainersForHost(host);
}
- clearState();
+ } else {
+ numReleasedContainers += releaseContainersForHost(ANY_HOST);
}
- } else {
- if (requestsQueue.isEmpty()) {
- log.debug("No more pending requests in Container Requests Queue.");
+ clearState();
+ }
+ return numReleasedContainers;
+ }
- List<Container> availableContainers = getContainersOnAHost(ANY_HOST);
- while(availableContainers != null && !availableContainers.isEmpty()) {
- Container c = availableContainers.remove(0);
- log.info("Releasing extra allocated container - {}", c.getId());
- amClient.releaseAssignedContainer(c.getId());
- numReleasedContainers++;
- }
- clearState();
+ /**
+ * Releases all allocated containers for the specified host.
+ * @param host the host for which the containers should be released.
+ * @return the number of containers released.
+ */
+ private int releaseContainersForHost(String host) {
+ int numReleasedContainers = 0;
+ List<Container> containers = getContainersOnAHost(host);
+ if (containers != null) {
+ for (Container c : containers) {
+ log.info("Releasing extra container {} allocated on {}", c.getId(), host);
+ amClient.releaseAssignedContainer(c.getId());
+ numReleasedContainers++;
}
}
return numReleasedContainers;
@@ -242,6 +241,7 @@ public class ContainerRequestState {
/**
* Clears all the state variables
* Performed when there are no more unfulfilled requests
+ * This is not synchronized because it is private.
*/
private void clearState() {
allocatedContainers.clear();
http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
index ff22dbf..8e1db77 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
@@ -18,13 +18,13 @@
*/
package org.apache.samza.job.yarn;
+import java.util.List;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.samza.config.YarnConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
/**
* This is the allocator thread that will be used by SamzaTaskManager when host-affinity is enabled for a job. It is similar to {@link org.apache.samza.job.yarn.ContainerAllocator}, except that it considers container locality for allocation.
@@ -55,66 +55,49 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
* allocatedContainers buffer keyed by "ANY_HOST".
*/
@Override
- public void run() {
- try {
- while (isRunning.get()) {
- while (!containerRequestState.getRequestsQueue().isEmpty()) {
- SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
- String preferredHost = request.getPreferredHost();
- int expectedContainerId = request.getExpectedContainerId();
+ public void assignContainerRequests() {
+ while (!containerRequestState.getRequestsQueue().isEmpty()) {
+ SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
+ String preferredHost = request.getPreferredHost();
+ int expectedContainerId = request.getExpectedContainerId();
- log.info(
- "Handling request for container id {} on preferred host {}",
- expectedContainerId,
- preferredHost);
+ log.info("Handling request for container id {} on preferred host {}", expectedContainerId, preferredHost);
- List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(preferredHost);
- if (allocatedContainers != null && allocatedContainers.size() > 0) {
- // Found allocated container at preferredHost
- Container container = allocatedContainers.get(0);
+ List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(preferredHost);
+ if (allocatedContainers != null && allocatedContainers.size() > 0) {
+ // Found allocated container at preferredHost
+ Container container = allocatedContainers.get(0);
- containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
+ containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
+ log.info("Running {} on {}", expectedContainerId, container.getId());
+ containerUtil.runMatchedContainer(expectedContainerId, container);
+ } else {
+ // No allocated container on preferredHost
+ log.info("Did not find any allocated containers on preferred host {} for running container id {}",
+ preferredHost, expectedContainerId);
+ boolean expired = requestExpired(request);
+ allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
+ if (!expired || allocatedContainers == null || allocatedContainers.size() == 0) {
+ log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't "
+ + "find any free allocated containers in the buffer. Breaking out of loop.",
+ request.getRequestTimestamp(), CONTAINER_REQUEST_TIMEOUT);
+ break;
+ } else {
+ if (allocatedContainers.size() > 0) {
+ Container container = allocatedContainers.get(0);
+ log.info("Found available containers on ANY_HOST. Assigning request for container_id {} with "
+ + "timestamp {} to container {}",
+ new Object[]{String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()});
+ containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
log.info("Running {} on {}", expectedContainerId, container.getId());
- containerUtil.runMatchedContainer(expectedContainerId, container);
- } else {
- // No allocated container on preferredHost
- log.info(
- "Did not find any allocated containers on preferred host {} for running container id {}",
- preferredHost,
- expectedContainerId);
- boolean expired = requestExpired(request);
- allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
- if (!expired || allocatedContainers == null || allocatedContainers.size() == 0) {
- log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't " +
- "find any free allocated containers in the buffer. Breaking out of loop.",
- request.getRequestTimestamp(),
- CONTAINER_REQUEST_TIMEOUT);
- break;
- } else {
- if (allocatedContainers.size() > 0) {
- Container container = allocatedContainers.get(0);
- log.info("Found available containers on ANY_HOST. Assigning request for container_id {} with " +
- "timestamp {} to container {}",
- new Object[] { String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()
- });
- containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
- log.info("Running {} on {}", expectedContainerId, container.getId());
- containerUtil.runContainer(expectedContainerId, container);
- }
- }
+ containerUtil.runContainer(expectedContainerId, container);
}
}
- // Release extra containers and update the entire system's state
- containerRequestState.releaseExtraContainers();
-
- Thread.sleep(ALLOCATOR_SLEEP_TIME);
}
- } catch (InterruptedException ie) {
- log.info("Got an InterruptedException in HostAwareContainerAllocator thread!", ie);
- } catch (Exception e) {
- log.info("Got an unknown Exception in HostAwareContainerAllocator thread!", e);
}
+ // Release extra containers and update the entire system's state
+ containerRequestState.releaseExtraContainers();
}
private boolean requestExpired(SamzaContainerRequest request) {
[2/7] samza git commit: SAMZA-862 : change log level to warn for
RocksDB with TTL and changelog
Posted by ni...@apache.org.
SAMZA-862 : change log level to warn for RocksDB with TTL and changelog
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6a5d49b8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6a5d49b8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6a5d49b8
Branch: refs/heads/samza-sql
Commit: 6a5d49b84939389cbfebd50682863ae4ee865ec0
Parents: 73a9395
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed Feb 3 15:19:46 2016 -0800
Committer: Navina <na...@gmail.com>
Committed: Wed Feb 3 15:20:00 2016 -0800
----------------------------------------------------------------------
.../scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/6a5d49b8/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 211fc3b..d614f8a 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -52,7 +52,7 @@ object RocksDbKeyValueStore extends Logging {
useTTL = true
if (isLoggedStore)
{
- error("%s is a TTL based store, changelog is not supported for TTL based stores, use at your own discretion" format storeName)
+ warn("%s is a TTL based store, changelog is not supported for TTL based stores, use at your own discretion" format storeName)
}
}
catch