You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:39 UTC
[23/50] [abbrv] samza git commit: SAMZA-769 - Replace deprecated
method call and fix warnings
SAMZA-769 - Replace deprecated method call and fix warnings
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0e94975e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0e94975e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0e94975e
Branch: refs/heads/samza-sql
Commit: 0e94975e522e40f7cb89e98bd923a071e4c07301
Parents: 74aa516
Author: Aleksandar Bircakovic <a....@levi9.com>
Authored: Wed Nov 18 12:43:44 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Wed Nov 18 12:43:44 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/samza/system/SystemAdmin.java | 4 ++--
.../samza/autoscaling/deployer/ConfigManager.java | 5 +++--
.../org/apache/samza/autoscaling/utils/YarnUtil.java | 15 +++++++++------
.../apache/samza/config/JavaSerializerConfig.java | 4 ++--
.../org/apache/samza/storage/StorageRecovery.java | 1 +
.../stream/TestCoordinatorStreamWriter.java | 1 +
.../samza/storage/kv/RocksDbKeyValueReader.java | 2 ++
.../log4j/serializers/LoggingEventJsonSerde.java | 5 ++++-
8 files changed, 24 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index bc926c5..ef99893 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -84,8 +84,8 @@ public interface SystemAdmin {
* offset1 == offset2 and offset1 > offset2 respectively. Return
* null if those two offsets are not comparable
*
- * @param offset1
- * @param offset2
+ * @param offset1 First offset for comparison.
+ * @param offset2 Second offset for comparison.
* @return -1 if offset1 < offset2; 0 if offset1 == offset2; 1 if offset1 > offset2. Null if not comparable
*/
Integer offsetComparator(String offset1, String offset2);
http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
----------------------------------------------------------------------
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
index 7089796..87346bc 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
@@ -21,6 +21,7 @@
package org.apache.samza.autoscaling.deployer;
import joptsimple.OptionSet;
+
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.samza.autoscaling.utils.YarnUtil;
import org.apache.samza.config.Config;
@@ -152,7 +153,6 @@ public class ConfigManager {
coordinatorStreamConsumer.stop();
coordinatorServerURL = null;
yarnUtil.stop();
-
}
/**
@@ -201,6 +201,7 @@ public class ConfigManager {
* @param keysToProcess a list of keys to process. Only messages with these keys will call their handler function,
* and other messages will be skipped. If the list is empty all messages will be skipped.
*/
+ @SuppressWarnings("unchecked")
private void processConfigMessages(List<String> keysToProcess) {
if (!coordinatorStreamConsumer.hasNewMessages(coordinatorStreamIterator)) {
return;
@@ -360,7 +361,7 @@ public class ConfigManager {
* To run the code use the following command:
* {path to samza deployment}/samza/bin/run-config-manager.sh --config-factory={config-factory} --config-path={path to config file of a job}
*
- * @param args
+ * @param args input arguments for running ConfigManager.
*/
public static void main(String[] args) {
CommandLine cmdline = new CommandLine();
http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
----------------------------------------------------------------------
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
index b2d37a7..376c549 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
@@ -44,13 +44,12 @@ import java.util.Map;
*/
public class YarnUtil {
private static final Logger log = LoggerFactory.getLogger(YarnUtil.class);
- private HttpClient httpclient;
+ private CloseableHttpClient httpclient;
private HttpHost rmServer;
private YarnClient yarnClient;
public YarnUtil(String rmAddress, int rmPort) {
-
- this.httpclient = new DefaultHttpClient();
+ this.httpclient = HttpClientBuilder.create().build();
this.rmServer = new HttpHost(rmAddress, rmPort, "http");
log.info("setting rm server to : " + rmServer);
YarnConfiguration hConfig = new YarnConfiguration();
@@ -146,7 +145,11 @@ public class YarnUtil {
* This function stops the YarnUtil by stopping the yarn client and http client.
*/
public void stop() {
- httpclient.getConnectionManager().shutdown();
+ try {
+ httpclient.close();
+ } catch (IOException e) {
+ log.error("HTTP Client failed to close.", e);
+ }
yarnClient.stop();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java
index 7db3e1c..946d4e2 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java
@@ -38,8 +38,8 @@ public class JavaSerializerConfig extends MapConfig {
}
/**
- * Returns a list of all serializer names from the config file. Useful for
- * getting individual serializers.
+ * Useful for getting individual serializers.
+ * @return a list of all serializer names from the config file
*/
public List<String> getSerdeNames() {
List<String> results = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index c564964..0324e90 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -204,6 +204,7 @@ public class StorageRecovery extends CommandLine {
* create one TaskStorageManager for each task. Add all of them to the
* List<TaskStorageManager>
*/
+ @SuppressWarnings({ "unchecked", "rawtypes" })
private void getTaskStorageManagers() {
StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
index 4eaaec2..f9c6304 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
@@ -105,6 +105,7 @@ public class TestCoordinatorStreamWriter {
assertTrue(systemProducer.isStopped());
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
public void testSendMessage() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//check a correct message
http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
index 6dcb407..f570422 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -86,6 +86,8 @@ public class RocksDbKeyValueReader {
* the db, it return null.
*
* @param key the key of the value you want to get
+ * @return deserialized value for the key
+ * Returns null, if the value doesn't exist
*/
public Object get(Object key) {
byte[] byteKey = keySerde.toBytes(key);
http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
index a18d8e0..129a4a0 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
@@ -50,6 +50,7 @@ public class LoggingEventJsonSerde implements Serde<LoggingEvent> {
// Have to wrap rather than extend due to type collisions between
// Serde<LoggingEvent> and Serde<Object>.
+ @SuppressWarnings("rawtypes")
private final JsonSerde jsonSerde;
/**
@@ -68,15 +69,17 @@ public class LoggingEventJsonSerde implements Serde<LoggingEvent> {
/**
* Constructs the serde.
- *
+ *
* @param includeLocationInfo
* Whether to include location info in the logging event or not.
*/
+ @SuppressWarnings("rawtypes")
public LoggingEventJsonSerde(boolean includeLocationInfo) {
this.includeLocationInfo = includeLocationInfo;
this.jsonSerde = new JsonSerde();
}
+ @SuppressWarnings("unchecked")
@Override
public byte[] toBytes(LoggingEvent loggingEvent) {
Map<String, Object> loggingEventMap = encodeToMap(loggingEvent, includeLocationInfo);