You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/26 16:06:16 UTC

[airavata] branch develop updated: Adding job id as the key of kafka message entity to make sure all message types for a particular job go to same partition

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 797027c  Adding job id as the key of kafka message entity to make sure all message types for a particular job go to same partition
797027c is described below

commit 797027c59ffb12bfd568761f6e767dac24f298d3
Author: dimuthu <di...@gmail.com>
AuthorDate: Mon Mar 26 12:06:10 2018 -0400

    Adding job id as the key of kafka message entity to make sure all message types for a particular job go to same partition
---
 .../airavata/monitor/kafka/MessageProducer.java    |  1 +
 .../monitor/kafka/JobStatusResultDeserializer.java | 53 --------------------
 .../monitor/kafka/JobStatusResultSerializer.java   | 48 -------------------
 .../job/monitor/kafka/MessageProducer.java         | 56 ----------------------
 4 files changed, 1 insertion(+), 157 deletions(-)

diff --git a/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java b/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
index 8079e4c..921c329 100644
--- a/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
+++ b/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/kafka/MessageProducer.java
@@ -51,6 +51,7 @@ public class MessageProducer {
     public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException, ApplicationSettingsException {
         final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(
                 ServerSettings.getSetting("job.monitor.broker.topic"),
+                jobStatusResult.getJobId(),
                 jobStatusResult);
         RecordMetadata recordMetadata = producer.send(record).get();
         producer.flush();
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java
deleted file mode 100644
index b331e31..0000000
--- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.job.monitor.kafka;
-
-import org.apache.airavata.job.monitor.parser.JobStatusResult;
-import org.apache.airavata.model.status.JobState;
-import org.apache.kafka.common.serialization.Deserializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.util.Map;
-
-public class JobStatusResultDeserializer implements Deserializer<JobStatusResult> {
-    @Override
-    public void configure(Map<String, ?> map, boolean b) {
-
-    }
-
-    @Override
-    public JobStatusResult deserialize(String s, byte[] bytes) {
-        String deserializedData = new String(bytes);
-        String[] parts = deserializedData.split(",");
-        JobStatusResult jobStatusResult = new JobStatusResult();
-        jobStatusResult.setJobId(parts[0]);
-        jobStatusResult.setJobName(parts[1]);
-        jobStatusResult.setState(JobState.valueOf(parts[2]));
-        return jobStatusResult;
-    }
-
-    @Override
-    public void close() {
-
-    }
-}
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java
deleted file mode 100644
index d73d7aa..0000000
--- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/JobStatusResultSerializer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.job.monitor.kafka;
-
-import org.apache.airavata.job.monitor.parser.JobStatusResult;
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-
-public class JobStatusResultSerializer implements Serializer<JobStatusResult> {
-
-    @Override
-    public void configure(Map<String, ?> map, boolean b) {
-
-    }
-
-    @Override
-    public byte[] serialize(String s, JobStatusResult jobStatusResult) {
-        String serializedData = jobStatusResult.getJobId() + "," + jobStatusResult.getJobName() + "," + jobStatusResult.getState().name();
-        return serializedData.getBytes();
-    }
-
-    @Override
-    public void close() {
-
-    }
-}
diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
deleted file mode 100644
index d1021f4..0000000
--- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.job.monitor.kafka;
-
-import org.apache.airavata.job.monitor.parser.JobStatusResult;
-import org.apache.kafka.clients.producer.*;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
-public class MessageProducer {
-    private final static String TOPIC = "parsed-data";
-    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
-
-    final Producer<String, JobStatusResult> producer;
-
-    public MessageProducer() {
-        producer = createProducer();
-    }
-
-    private Producer<String, JobStatusResult> createProducer() {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                BOOTSTRAP_SERVERS);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                StringSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                JobStatusResultSerializer.class.getName());
-        return new KafkaProducer<String, JobStatusResult>(props);
-    }
-
-    public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException {
-        final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(TOPIC, jobStatusResult);
-        RecordMetadata recordMetadata = producer.send(record).get();
-        producer.flush();
-    }
-}

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.