You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/26 16:06:16 UTC
[airavata] branch develop updated: Adding job id as the key of
kafka message entity to make sure all message types for a particular job go
to same partition
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new 797027c Adding job id as the key of kafka message entity to make sure all message types for a particular job go to same partition
797027c is described below
commit 797027c59ffb12bfd568761f6e767dac24f298d3
Author: dimuthu <di...@gmail.com>
AuthorDate: Mon Mar 26 12:06:10 2018 -0400
Adding job id as the key of kafka message entity to make sure all message types for a particular job go to same partition
---
.../airavata/monitor/kafka/MessageProducer.java | 1 +
.../monitor/kafka/JobStatusResultDeserializer.java | 53 --------------------
.../monitor/kafka/JobStatusResultSerializer.java | 48 -------------------
.../job/monitor/kafka/MessageProducer.java | 56 ----------------------
4 files changed, 1 insertion(+), 157 deletions(-)
diff --git a/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java b/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
index 8079e4c..921c329 100644
--- a/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
+++ b/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
@@ -51,6 +51,7 @@ public class MessageProducer {
public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException, ApplicationSettingsException {
final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(
ServerSettings.getSetting("job.monitor.broker.topic"),
+ jobStatusResult.getJobId(),
jobStatusResult);
RecordMetadata recordMetadata = producer.send(record).get();
producer.flush();
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java
deleted file mode 100644
index b331e31..0000000
--- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.job.monitor.kafka;
-
-import org.apache.airavata.job.monitor.parser.JobStatusResult;
-import org.apache.airavata.model.status.JobState;
-import org.apache.kafka.common.serialization.Deserializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.util.Map;
-
-public class JobStatusResultDeserializer implements Deserializer<JobStatusResult> {
- @Override
- public void configure(Map<String, ?> map, boolean b) {
-
- }
-
- @Override
- public JobStatusResult deserialize(String s, byte[] bytes) {
- String deserializedData = new String(bytes);
- String[] parts = deserializedData.split(",");
- JobStatusResult jobStatusResult = new JobStatusResult();
- jobStatusResult.setJobId(parts[0]);
- jobStatusResult.setJobName(parts[1]);
- jobStatusResult.setState(JobState.valueOf(parts[2]));
- return jobStatusResult;
- }
-
- @Override
- public void close() {
-
- }
-}
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java
deleted file mode 100644
index d73d7aa..0000000
--- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.job.monitor.kafka;
-
-import org.apache.airavata.job.monitor.parser.JobStatusResult;
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-
-public class JobStatusResultSerializer implements Serializer<JobStatusResult> {
-
- @Override
- public void configure(Map<String, ?> map, boolean b) {
-
- }
-
- @Override
- public byte[] serialize(String s, JobStatusResult jobStatusResult) {
- String serializedData = jobStatusResult.getJobId() + "," + jobStatusResult.getJobName() + "," + jobStatusResult.getState().name();
- return serializedData.getBytes();
- }
-
- @Override
- public void close() {
-
- }
-}
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
deleted file mode 100644
index d1021f4..0000000
--- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.job.monitor.kafka;
-
-import org.apache.airavata.job.monitor.parser.JobStatusResult;
-import org.apache.kafka.clients.producer.*;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
-public class MessageProducer {
- private final static String TOPIC = "parsed-data";
- private final static String BOOTSTRAP_SERVERS = "localhost:9092";
-
- final Producer<String, JobStatusResult> producer;
-
- public MessageProducer() {
- producer = createProducer();
- }
-
- private Producer<String, JobStatusResult> createProducer() {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- BOOTSTRAP_SERVERS);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- JobStatusResultSerializer.class.getName());
- return new KafkaProducer<String, JobStatusResult>(props);
- }
-
- public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException {
- final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(TOPIC, jobStatusResult);
- RecordMetadata recordMetadata = producer.send(record).get();
- producer.flush();
- }
-}
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.