You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/19 19:31:19 UTC

[1/7] samza git commit: SAMZA-853 : Use Optional class from guava instead of elasticsearch

Repository: samza
Updated Branches:
  refs/heads/samza-sql fb4f12ed3 -> c50096291


SAMZA-853 : Use Optional class from guava instead of elasticsearch


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/73a93955
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/73a93955
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/73a93955

Branch: refs/heads/samza-sql
Commit: 73a939554c04a8725549fa8d954632d8ebdb2905
Parents: fc303e5
Author: Vadim Chekan <k....@gmail.com>
Authored: Wed Feb 3 14:48:27 2016 -0800
Committer: Navina <na...@gmail.com>
Committed: Wed Feb 3 14:48:27 2016 -0800

----------------------------------------------------------------------
 build.gradle                                                       | 1 +
 .../src/main/java/org/apache/samza/config/ElasticsearchConfig.java | 2 +-
 .../elasticsearch/indexrequest/DefaultIndexRequestFactory.java     | 2 +-
 3 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/73a93955/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0541def..16facbb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -199,6 +199,7 @@ project(':samza-elasticsearch') {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
     compile "org.elasticsearch:elasticsearch:$elasticsearchVersion"
+    compile "com.google.guava:guava:$guavaVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/73a93955/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
index 6091feb..75bf4c7 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
@@ -20,7 +20,7 @@
 package org.apache.samza.config;
 
 import org.apache.samza.SamzaException;
-import org.elasticsearch.common.base.Optional;
+import com.google.common.base.Optional;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/73a93955/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
index ddac22f..7f1e884 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
@@ -23,7 +23,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.base.Optional;
+import com.google.common.base.Optional;
 import org.elasticsearch.index.VersionType;
 
 import java.util.Map;


[5/7] samza git commit: SAMZA-851: update Hello-Samza w/ CDH tutorial documentation

Posted by ni...@apache.org.
SAMZA-851: update Hello-Samza w/ CDH tutorial documentation


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/21764303
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/21764303
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/21764303

Branch: refs/heads/samza-sql
Commit: 2176430379aa6355a27148d1384fbc342b96f24a
Parents: 495f2eb
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Feb 9 21:18:38 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Feb 11 14:34:32 2016 -0800

----------------------------------------------------------------------
 docs/learn/tutorials/versioned/deploy-samza-to-CDH.md | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/21764303/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md b/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
index 36df300..daf762b 100644
--- a/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
+++ b/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
@@ -19,7 +19,15 @@ title: Deploy Samza Job To CDH
    limitations under the License.
 -->
 
-The tutorial assumes you have successfully run [hello-samza](../../../startup/hello-samza/{{site.version}}/) and now you want to deploy the job to your Cloudera Data Hub ([CDH](http://www.cloudera.com/content/cloudera/en/products-and-services/cdh.html)). This tutorial is based on CDH 5.0.0 and uses hello-samza as the example job.
+The tutorial assumes you have successfully run [hello-samza](../../../startup/hello-samza/{{site.version}}/) and now you want to deploy the job to your Cloudera Data Hub ([CDH](http://www.cloudera.com/content/cloudera/en/products-and-services/cdh.html)). This tutorial is based on CDH 5.4.0 and uses hello-samza as the example job.
+
+### Compile Package for CDH 5.4.0
+
+We need to use a specific compile option to build hello-samza package for CDH 5.4.0
+
+{% highlight bash %}
+mvn clean package -Denv=cdh5.4.0
+{% endhighlight %}
 
 ### Upload Package to Cluster
 


[6/7] samza git commit: SAMZA-836: Fix rocksdb unit test failure

Posted by ni...@apache.org.
SAMZA-836: Fix rocksdb unit test failure


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4bcf88f5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4bcf88f5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4bcf88f5

Branch: refs/heads/samza-sql
Commit: 4bcf88f5e38ed18ff485a29fa6a472a922c0900d
Parents: 2176430
Author: Yi Pan <ni...@gmail.com>
Authored: Fri Feb 12 17:29:22 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Feb 12 17:29:22 2016 -0800

----------------------------------------------------------------------
 .../samza/storage/kv/TestRocksDbKeyValueStore.scala     | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4bcf88f5/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index 0c86a5a..b7f1cdc 100644
--- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -23,14 +23,10 @@ package org.apache.samza.storage.kv
 import java.io.File
 import java.util
 
-import org.apache.samza.{SamzaException, Partition}
 import org.apache.samza.config.MapConfig
-import org.apache.samza.container.{TaskName, SamzaContainerContext}
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{NoOpMetricsRegistry, ExponentialSleepStrategy}
-import org.apache.samza.util.Util._
+import org.apache.samza.util.ExponentialSleepStrategy
 import org.junit.{Assert, Test}
-import org.rocksdb.{RocksDB, FlushOptions, RocksDBException, Options}
+import org.rocksdb.{RocksDB, FlushOptions, Options}
 
 class TestRocksDbKeyValueStore
 {
@@ -70,7 +66,6 @@ class TestRocksDbKeyValueStore
   def testFlush(): Unit = {
     val map = new util.HashMap[String, String]()
     val config = new MapConfig(map)
-    val flushOptions = new FlushOptions().setWaitForFlush(true)
     val options = new Options()
     options.setCreateIfMissing(true)
     val rocksDB = RocksDbKeyValueStore.openDB(new File(System.getProperty("java.io.tmpdir")),
@@ -80,6 +75,9 @@ class TestRocksDbKeyValueStore
                                               "dbStore")
     val key = "key".getBytes("UTF-8")
     rocksDB.put(key, "val".getBytes("UTF-8"))
+    // SAMZA-836: Mysteriously,calling new FlushOptions() does not invoke the NativeLibraryLoader in rocksdbjni-3.13.1!
+    // Moving this line after calling new Options() resolve the issue.
+    val flushOptions = new FlushOptions().setWaitForFlush(true)
     rocksDB.flush(flushOptions)
     val dbDir = new File(System.getProperty("java.io.tmpdir")).toString
     val rocksDBReadOnly = RocksDB.openReadOnly(options, dbDir)


[3/7] samza git commit: SAMZA-861: move KV-store public APIs to samza-api module

Posted by ni...@apache.org.
SAMZA-861: move KV-store public APIs to samza-api module


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7c23e24f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7c23e24f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7c23e24f

Branch: refs/heads/samza-sql
Commit: 7c23e24f21b2e5d9aa2d5f2c66aea885d5ac4674
Parents: 6a5d49b
Author: Boris Shkolik <bo...@apache.org>
Authored: Mon Feb 8 11:10:18 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Feb 8 11:10:18 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/samza/storage/kv/Entry.java |  46 ++++++
 .../samza/storage/kv/KeyValueIterator.java      |  26 +++
 .../apache/samza/storage/kv/KeyValueStore.java  | 163 +++++++++++++++++++
 .../java/org/apache/samza/storage/kv/Entry.java |  46 ------
 .../samza/storage/kv/KeyValueIterator.java      |  26 ---
 .../apache/samza/storage/kv/KeyValueStore.java  | 163 -------------------
 6 files changed, 235 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-api/src/main/java/org/apache/samza/storage/kv/Entry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/Entry.java b/samza-api/src/main/java/org/apache/samza/storage/kv/Entry.java
new file mode 100644
index 0000000..00e9f12
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/Entry.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+/**
+ * A key and value.
+ * 
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public class Entry<K, V> {
+
+  private final K key;
+  private final V value;
+  
+  public Entry(K key, V value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public V getValue() {
+    return value;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
new file mode 100644
index 0000000..854ebbf
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.Iterator;
+
+public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>> {
+  public void close();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
new file mode 100644
index 0000000..b1fea7b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A key-value store that supports put, get, delete, and range queries.
+ *
+ * @param <K> the type of keys maintained by this key-value store.
+ * @param <V> the type of values maintained by this key-value store.
+ */
+public interface KeyValueStore<K, V> {
+  /**
+   * Gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  V get(K key);
+
+  /**
+   * Gets the values with which the specified {@code keys} are associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return a map of the keys that were found and their respective values.
+   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+   */
+  Map<K, V> getAll(List<K> keys);
+
+  /**
+   * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param value the value with which the specified {@code key} is to be associated.
+   * @throws NullPointerException if the specified {@code key} or {@code value} is {@code null}.
+   */
+  void put(K key, V value);
+
+  /**
+   * Updates the mappings of the specified key-value {@code entries}.
+   *
+   * @param entries the updated mappings to put into this key-value store.
+   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key or value.
+   */
+  void putAll(List<Entry<K, V>> entries);
+
+  /**
+   * Deletes the mapping for the specified {@code key} from this key-value store (if such mapping exists).
+   *
+   * @param key the key for which the mapping is to be deleted.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  void delete(K key);
+
+  /**
+   * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
+   *
+   * @param keys the keys for which the mappings are to be deleted.
+   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+   */
+  void deleteAll(List<K> keys);
+
+  /**
+   * Returns an iterator for a sorted range of entries specified by [{@code from}, {@code to}).
+   *
+   * <p><b>API Note:</b> The returned iterator MUST be closed after use. The comparator used for finding entries that belong to the specified
+   * range compares the underlying serialized big-endian byte array representation of keys, lexicographically.
+   * @see <a href="http://en.wikipedia.org/wiki/Lexicographical_order">Lexicographical order article at Wikipedia</a></p>
+   * @param from the key specifying the low endpoint (inclusive) of the keys in the returned range.
+   * @param to the key specifying the high endpoint (exclusive) of the keys in the returned range.
+   * @return an iterator for the specified key range.
+   * @throws NullPointerException if null is used for {@code from} or {@code to}.
+   */
+  KeyValueIterator<K, V> range(K from, K to);
+
+  /**
+   * Returns an iterator for all entries in this key-value store.
+   *
+   * <p><b>API Note:</b> The returned iterator MUST be closed after use.</p>
+   * @return an iterator for all entries in this key-value store.
+   */
+  KeyValueIterator<K, V> all();
+
+  /**
+   * Closes this key-value store, if applicable, relinquishing any underlying resources.
+   */
+  void close();
+
+  /**
+   * Flushes this key-value store, if applicable.
+   */
+  void flush();
+
+  /**
+   * Represents an extension for classes that implement {@link KeyValueStore}.
+   */
+  // TODO replace with default interface methods when we can use Java 8 features.
+  class Extension {
+    private Extension() {
+      // This class cannot be instantiated
+    }
+
+    /**
+     * Gets the values with which the specified {@code keys} are associated.
+     *
+     * @param store the key-value store for which this operation is to be performed.
+     * @param keys the keys with which the associated values are to be fetched.
+     * @param <K> the type of keys maintained by the specified {@code store}.
+     * @param <V> the type of values maintained by the specified {@code store}.
+     * @return a map of the keys that were found and their respective values.
+     * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+     */
+    public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, final List<K> keys) {
+      final Map<K, V> map = new HashMap<>(keys.size());
+
+      for (final K key : keys) {
+        final V value = store.get(key);
+
+        if (value != null) {
+          map.put(key, value);
+        }
+      }
+
+      return map;
+    }
+
+    /**
+     * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
+     *
+     * @param store the key-value store for which this operation is to be performed.
+     * @param keys the keys for which the mappings are to be deleted.
+     * @param <K> the type of keys maintained by the specified {@code store}.
+     * @param <V> the type of values maintained by the specified {@code store}.
+     * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+     */
+    public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final List<K> keys) {
+      for (final K key : keys) {
+        store.delete(key);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-kv/src/main/java/org/apache/samza/storage/kv/Entry.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/Entry.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/Entry.java
deleted file mode 100644
index 00e9f12..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/Entry.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-/**
- * A key and value.
- * 
- * @param <K> The key type
- * @param <V> The value type
- */
-public class Entry<K, V> {
-
-  private final K key;
-  private final V value;
-  
-  public Entry(K key, V value) {
-    this.key = key;
-    this.value = value;
-  }
-
-  public K getKey() {
-    return key;
-  }
-
-  public V getValue() {
-    return value;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
deleted file mode 100644
index 854ebbf..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import java.util.Iterator;
-
-public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>> {
-  public void close();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/7c23e24f/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
deleted file mode 100644
index b1fea7b..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A key-value store that supports put, get, delete, and range queries.
- *
- * @param <K> the type of keys maintained by this key-value store.
- * @param <V> the type of values maintained by this key-value store.
- */
-public interface KeyValueStore<K, V> {
-  /**
-   * Gets the value associated with the specified {@code key}.
-   *
-   * @param key the key with which the associated value is to be fetched.
-   * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   */
-  V get(K key);
-
-  /**
-   * Gets the values with which the specified {@code keys} are associated.
-   *
-   * @param keys the keys with which the associated values are to be fetched.
-   * @return a map of the keys that were found and their respective values.
-   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-   */
-  Map<K, V> getAll(List<K> keys);
-
-  /**
-   * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
-   *
-   * @param key the key with which the specified {@code value} is to be associated.
-   * @param value the value with which the specified {@code key} is to be associated.
-   * @throws NullPointerException if the specified {@code key} or {@code value} is {@code null}.
-   */
-  void put(K key, V value);
-
-  /**
-   * Updates the mappings of the specified key-value {@code entries}.
-   *
-   * @param entries the updated mappings to put into this key-value store.
-   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key or value.
-   */
-  void putAll(List<Entry<K, V>> entries);
-
-  /**
-   * Deletes the mapping for the specified {@code key} from this key-value store (if such mapping exists).
-   *
-   * @param key the key for which the mapping is to be deleted.
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   */
-  void delete(K key);
-
-  /**
-   * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
-   *
-   * @param keys the keys for which the mappings are to be deleted.
-   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-   */
-  void deleteAll(List<K> keys);
-
-  /**
-   * Returns an iterator for a sorted range of entries specified by [{@code from}, {@code to}).
-   *
-   * <p><b>API Note:</b> The returned iterator MUST be closed after use. The comparator used for finding entries that belong to the specified
-   * range compares the underlying serialized big-endian byte array representation of keys, lexicographically.
-   * @see <a href="http://en.wikipedia.org/wiki/Lexicographical_order">Lexicographical order article at Wikipedia</a></p>
-   * @param from the key specifying the low endpoint (inclusive) of the keys in the returned range.
-   * @param to the key specifying the high endpoint (exclusive) of the keys in the returned range.
-   * @return an iterator for the specified key range.
-   * @throws NullPointerException if null is used for {@code from} or {@code to}.
-   */
-  KeyValueIterator<K, V> range(K from, K to);
-
-  /**
-   * Returns an iterator for all entries in this key-value store.
-   *
-   * <p><b>API Note:</b> The returned iterator MUST be closed after use.</p>
-   * @return an iterator for all entries in this key-value store.
-   */
-  KeyValueIterator<K, V> all();
-
-  /**
-   * Closes this key-value store, if applicable, relinquishing any underlying resources.
-   */
-  void close();
-
-  /**
-   * Flushes this key-value store, if applicable.
-   */
-  void flush();
-
-  /**
-   * Represents an extension for classes that implement {@link KeyValueStore}.
-   */
-  // TODO replace with default interface methods when we can use Java 8 features.
-  class Extension {
-    private Extension() {
-      // This class cannot be instantiated
-    }
-
-    /**
-     * Gets the values with which the specified {@code keys} are associated.
-     *
-     * @param store the key-value store for which this operation is to be performed.
-     * @param keys the keys with which the associated values are to be fetched.
-     * @param <K> the type of keys maintained by the specified {@code store}.
-     * @param <V> the type of values maintained by the specified {@code store}.
-     * @return a map of the keys that were found and their respective values.
-     * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-     */
-    public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, final List<K> keys) {
-      final Map<K, V> map = new HashMap<>(keys.size());
-
-      for (final K key : keys) {
-        final V value = store.get(key);
-
-        if (value != null) {
-          map.put(key, value);
-        }
-      }
-
-      return map;
-    }
-
-    /**
-     * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
-     *
-     * @param store the key-value store for which this operation is to be performed.
-     * @param keys the keys for which the mappings are to be deleted.
-     * @param <K> the type of keys maintained by the specified {@code store}.
-     * @param <V> the type of values maintained by the specified {@code store}.
-     * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-     */
-    public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final List<K> keys) {
-      for (final K key : keys) {
-        store.delete(key);
-      }
-    }
-  }
-}


[7/7] samza git commit: Merge branch 'master' into samza-sql

Posted by ni...@apache.org.
Merge branch 'master' into samza-sql


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c5009629
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c5009629
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c5009629

Branch: refs/heads/samza-sql
Commit: c500962912041a70553cb39cf02d6faba23c6b60
Parents: fb4f12e 4bcf88f
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Wed Feb 17 09:49:30 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Feb 17 09:49:30 2016 -0800

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../tutorials/versioned/deploy-samza-to-CDH.md  |  10 +-
 .../java/org/apache/samza/storage/kv/Entry.java |  46 ++++++
 .../samza/storage/kv/KeyValueIterator.java      |  26 +++
 .../apache/samza/storage/kv/KeyValueStore.java  | 163 +++++++++++++++++++
 .../samza/config/ElasticsearchConfig.java       |   2 +-
 .../DefaultIndexRequestFactory.java             |   2 +-
 .../samza/storage/kv/RocksDbKeyValueStore.scala |   2 +-
 .../storage/kv/TestRocksDbKeyValueStore.scala   |  12 +-
 .../java/org/apache/samza/storage/kv/Entry.java |  46 ------
 .../samza/storage/kv/KeyValueIterator.java      |  26 ---
 .../apache/samza/storage/kv/KeyValueStore.java  | 163 -------------------
 .../job/yarn/AbstractContainerAllocator.java    |  33 +++-
 .../samza/job/yarn/ContainerAllocator.java      |  40 ++---
 .../samza/job/yarn/ContainerRequestState.java   |  58 +++----
 .../job/yarn/HostAwareContainerAllocator.java   |  87 ++++------
 16 files changed, 363 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c5009629/build.gradle
----------------------------------------------------------------------


[4/7] samza git commit: SAMZA-866: Refactor container allocator classes

Posted by ni...@apache.org.
SAMZA-866: Refactor container allocator classes


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/495f2eb8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/495f2eb8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/495f2eb8

Branch: refs/heads/samza-sql
Commit: 495f2eb8e66d0729f7d5b66cdcff47eda32ef8a6
Parents: 7c23e24
Author: Jacob Maes <ja...@gmail.com>
Authored: Tue Feb 9 23:30:21 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Feb 9 23:30:21 2016 -0800

----------------------------------------------------------------------
 .../job/yarn/AbstractContainerAllocator.java    | 33 +++++++-
 .../samza/job/yarn/ContainerAllocator.java      | 40 ++++-----
 .../samza/job/yarn/ContainerRequestState.java   | 58 ++++++-------
 .../job/yarn/HostAwareContainerAllocator.java   | 87 ++++++++------------
 4 files changed, 110 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
index 9ee2dac..2e192ee 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.samza.config.YarnConfig;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * This class is responsible for making requests for containers to the AM and also, assigning a container to run on an allocated resource.
@@ -34,6 +37,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * See {@link org.apache.samza.job.yarn.ContainerAllocator} and {@link org.apache.samza.job.yarn.HostAwareContainerAllocator}
  */
 public abstract class AbstractContainerAllocator implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(AbstractContainerAllocator.class);
+
   public static final String ANY_HOST = ContainerRequestState.ANY_HOST;
   public static final int DEFAULT_PRIORITY = 0;
   public static final int DEFAULT_CONTAINER_MEM = 1024;
@@ -45,9 +50,6 @@ public abstract class AbstractContainerAllocator implements Runnable {
   protected final int containerMaxMemoryMb;
   protected final int containerMaxCpuCore;
 
-  @Override
-  public abstract void run();
-
   // containerRequestState indicate the state of all unfulfilled container requests and allocated containers
   protected final ContainerRequestState containerRequestState;
 
@@ -66,6 +68,31 @@ public abstract class AbstractContainerAllocator implements Runnable {
     this.containerMaxCpuCore = yarnConfig.getContainerMaxCpuCores();
   }
 
+  /**
+   * Continuously assigns requested containers to the allocated containers provided by the cluster manager.
+   * The loop frequency is governed by thread sleeps for ALLOCATOR_SLEEP_TIME ms.
+   *
+   * Terminates when the isRunning flag is cleared.
+   */
+  @Override
+  public void run() {
+    while(isRunning.get()) {
+      try {
+        assignContainerRequests();
+        Thread.sleep(ALLOCATOR_SLEEP_TIME);
+      } catch (InterruptedException e) {
+        log.info("Got InterruptedException in AllocatorThread.", e);
+      } catch (Exception e) {
+        log.error("Got unknown Exception in AllocatorThread.", e);
+      }
+    }
+  }
+
+  /**
+   * Assigns the container requests from the queue to the allocated containers from the cluster manager and
+   * runs them.
+   */
+  protected abstract void assignContainerRequests();
 
   /**
    * Called during initial request for containers

http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
index 7c57a86..31fcc57 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
@@ -18,13 +18,13 @@
  */
 package org.apache.samza.job.yarn;
 
+import java.util.List;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.samza.config.YarnConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.List;
 
 /**
  * This is the default allocator thread that will be used by SamzaTaskManager.
@@ -42,36 +42,28 @@ public class ContainerAllocator extends AbstractContainerAllocator {
   }
 
   /**
-   * During the run() method, the thread sleeps for ALLOCATOR_SLEEP_TIME ms. It tries to allocate any unsatisfied
-   * request that is still in the request queue (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
+   * This method tries to allocate any unsatisfied request that is still in the request queue
+   * (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
    * with allocated containers, if any.
    *
    * Since host-affinity is not enabled, all allocated container resources are buffered in the list keyed by "ANY_HOST".
    * */
   @Override
-  public void run() {
-    while(isRunning.get()) {
-      try {
-        List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
-        while (!containerRequestState.getRequestsQueue().isEmpty() && allocatedContainers != null && allocatedContainers.size() > 0) {
-          SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
-          Container container = allocatedContainers.get(0);
-
-          // Update state
-          containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
+  public void assignContainerRequests() {
+    List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
+    while (!containerRequestState.getRequestsQueue().isEmpty() && allocatedContainers != null && allocatedContainers.size() > 0) {
+      SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
+      Container container = allocatedContainers.get(0);
 
-          // Cancel request and run container
-          log.info("Running {} on {}", request.expectedContainerId, container.getId());
-          containerUtil.runContainer(request.expectedContainerId, container);
-        }
+      // Update state
+      containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
 
-        // If requestQueue is empty, all extra containers in the buffer should be released.
-        containerRequestState.releaseExtraContainers();
-
-        Thread.sleep(ALLOCATOR_SLEEP_TIME);
-      } catch (InterruptedException e) {
-        log.info("Got InterruptedException in AllocatorThread. Pending Container request(s) cannot be fulfilled!!", e);
-      }
+      // Cancel request and run container
+      log.info("Running {} on {}", request.expectedContainerId, container.getId());
+      containerUtil.runContainer(request.expectedContainerId, container);
     }
+
+    // If requestQueue is empty, all extra containers in the buffer should be released.
+    containerRequestState.releaseExtraContainers();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
index ab3061e..54db5e5 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
@@ -18,18 +18,17 @@
  */
 package org.apache.samza.job.yarn;
 
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class maintains the state variables for all the container requests and the allocated containers returned
@@ -205,35 +204,35 @@ public class ContainerRequestState {
   public synchronized int releaseExtraContainers() {
     int numReleasedContainers = 0;
 
-    if (hostAffinityEnabled) {
-      if (requestsQueue.isEmpty()) {
-        log.debug("Container Requests Queue is empty.");
+    if (requestsQueue.isEmpty()) {
+      log.debug("Container Requests Queue is empty.");
 
+      if (hostAffinityEnabled) {
         List<String> allocatedHosts = getAllocatedHosts();
         for (String host : allocatedHosts) {
-          List<Container> containers = getContainersOnAHost(host);
-          if (containers != null) {
-            for (Container c : containers) {
-              log.info("Releasing extra container {} allocated on {}", c.getId(), host);
-              amClient.releaseAssignedContainer(c.getId());
-              numReleasedContainers++;
-            }
-          }
+          numReleasedContainers += releaseContainersForHost(host);
         }
-        clearState();
+      } else {
+        numReleasedContainers += releaseContainersForHost(ANY_HOST);
       }
-    } else {
-      if (requestsQueue.isEmpty()) {
-        log.debug("No more pending requests in Container Requests Queue.");
+      clearState();
+    }
+    return numReleasedContainers;
+  }
 
-        List<Container> availableContainers = getContainersOnAHost(ANY_HOST);
-        while(availableContainers != null && !availableContainers.isEmpty()) {
-          Container c = availableContainers.remove(0);
-          log.info("Releasing extra allocated container - {}", c.getId());
-          amClient.releaseAssignedContainer(c.getId());
-          numReleasedContainers++;
-        }
-        clearState();
+  /**
+   * Releases all allocated containers for the specified host.
+   * @param host  the host for which the containers should be released.
+   * @return      the number of containers released.
+   */
+  private int releaseContainersForHost(String host) {
+    int numReleasedContainers = 0;
+    List<Container> containers = getContainersOnAHost(host);
+    if (containers != null) {
+      for (Container c : containers) {
+        log.info("Releasing extra container {} allocated on {}", c.getId(), host);
+        amClient.releaseAssignedContainer(c.getId());
+        numReleasedContainers++;
       }
     }
     return numReleasedContainers;
@@ -242,6 +241,7 @@ public class ContainerRequestState {
   /**
    * Clears all the state variables
    * Performed when there are no more unfulfilled requests
+   * This is not synchronized because it is private.
    */
   private void clearState() {
     allocatedContainers.clear();

http://git-wip-us.apache.org/repos/asf/samza/blob/495f2eb8/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
index ff22dbf..8e1db77 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
@@ -18,13 +18,13 @@
  */
 package org.apache.samza.job.yarn;
 
+import java.util.List;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.samza.config.YarnConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.List;
 
 /**
  * This is the allocator thread that will be used by SamzaTaskManager when host-affinity is enabled for a job. It is similar to {@link org.apache.samza.job.yarn.ContainerAllocator}, except that it considers container locality for allocation.
@@ -55,66 +55,49 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
    * allocatedContainers buffer keyed by "ANY_HOST".
    */
   @Override
-  public void run() {
-    try {
-      while (isRunning.get()) {
-        while (!containerRequestState.getRequestsQueue().isEmpty()) {
-          SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
-          String preferredHost = request.getPreferredHost();
-          int expectedContainerId = request.getExpectedContainerId();
+  public void assignContainerRequests() {
+    while (!containerRequestState.getRequestsQueue().isEmpty()) {
+      SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
+      String preferredHost = request.getPreferredHost();
+      int expectedContainerId = request.getExpectedContainerId();
 
-          log.info(
-              "Handling request for container id {} on preferred host {}",
-              expectedContainerId,
-              preferredHost);
+      log.info("Handling request for container id {} on preferred host {}", expectedContainerId, preferredHost);
 
-          List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(preferredHost);
-          if (allocatedContainers != null && allocatedContainers.size() > 0) {
-            // Found allocated container at preferredHost
-            Container container = allocatedContainers.get(0);
+      List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(preferredHost);
+      if (allocatedContainers != null && allocatedContainers.size() > 0) {
+        // Found allocated container at preferredHost
+        Container container = allocatedContainers.get(0);
 
-            containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
+        containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
 
+        log.info("Running {} on {}", expectedContainerId, container.getId());
+        containerUtil.runMatchedContainer(expectedContainerId, container);
+      } else {
+        // No allocated container on preferredHost
+        log.info("Did not find any allocated containers on preferred host {} for running container id {}",
+            preferredHost, expectedContainerId);
+        boolean expired = requestExpired(request);
+        allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
+        if (!expired || allocatedContainers == null || allocatedContainers.size() == 0) {
+          log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't "
+                  + "find any free allocated containers in the buffer. Breaking out of loop.",
+              request.getRequestTimestamp(), CONTAINER_REQUEST_TIMEOUT);
+          break;
+        } else {
+          if (allocatedContainers.size() > 0) {
+            Container container = allocatedContainers.get(0);
+            log.info("Found available containers on ANY_HOST. Assigning request for container_id {} with "
+                    + "timestamp {} to container {}",
+                new Object[]{String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()});
+            containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
             log.info("Running {} on {}", expectedContainerId, container.getId());
-            containerUtil.runMatchedContainer(expectedContainerId, container);
-          } else {
-            // No allocated container on preferredHost
-            log.info(
-                "Did not find any allocated containers on preferred host {} for running container id {}",
-                preferredHost,
-                expectedContainerId);
-            boolean expired = requestExpired(request);
-            allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
-            if (!expired || allocatedContainers == null || allocatedContainers.size() == 0) {
-              log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't " +
-                      "find any free allocated containers in the buffer. Breaking out of loop.",
-                  request.getRequestTimestamp(),
-                  CONTAINER_REQUEST_TIMEOUT);
-              break;
-            } else {
-              if (allocatedContainers.size() > 0) {
-                Container container = allocatedContainers.get(0);
-                log.info("Found available containers on ANY_HOST. Assigning request for container_id {} with " +
-                        "timestamp {} to container {}",
-                    new Object[] { String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()
-                });
-                containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
-                log.info("Running {} on {}", expectedContainerId, container.getId());
-                containerUtil.runContainer(expectedContainerId, container);
-              }
-            }
+            containerUtil.runContainer(expectedContainerId, container);
           }
         }
-        // Release extra containers and update the entire system's state
-        containerRequestState.releaseExtraContainers();
-
-        Thread.sleep(ALLOCATOR_SLEEP_TIME);
       }
-    } catch (InterruptedException ie) {
-      log.info("Got an InterruptedException in HostAwareContainerAllocator thread!", ie);
-    } catch (Exception e) {
-      log.info("Got an unknown Exception in HostAwareContainerAllocator thread!", e);
     }
+    // Release extra containers and update the entire system's state
+    containerRequestState.releaseExtraContainers();
   }
 
   private boolean requestExpired(SamzaContainerRequest request) {


[2/7] samza git commit: SAMZA-862 : change log level to warn for RocksDB with TTL and changelog

Posted by ni...@apache.org.
SAMZA-862 : change log level to warn for RocksDB with TTL and changelog


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6a5d49b8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6a5d49b8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6a5d49b8

Branch: refs/heads/samza-sql
Commit: 6a5d49b84939389cbfebd50682863ae4ee865ec0
Parents: 73a9395
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed Feb 3 15:19:46 2016 -0800
Committer: Navina <na...@gmail.com>
Committed: Wed Feb 3 15:20:00 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6a5d49b8/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 211fc3b..d614f8a 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -52,7 +52,7 @@ object RocksDbKeyValueStore extends Logging {
         useTTL = true
         if (isLoggedStore)
         {
-          error("%s is a TTL based store, changelog is not supported for TTL based stores, use at your own discretion" format storeName)
+          warn("%s is a TTL based store, changelog is not supported for TTL based stores, use at your own discretion" format storeName)
         }
       }
       catch