You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/02/03 08:21:06 UTC
[camel] 02/03: Camel-Weka: First batch of CS fixes
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 36e79ae5f4b45038fe5f45cd62611bc4d6f87fe6
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Feb 3 09:18:00 2020 +0100
Camel-Weka: First batch of CS fixes
---
.../apache/camel/component/weka/WekaComponent.java | 4 +-
.../camel/component/weka/WekaConfiguration.java | 4 +-
.../apache/camel/component/weka/WekaEndpoint.java | 5 +-
.../apache/camel/component/weka/WekaProducer.java | 76 +++++++++++-----------
.../camel/component/weka/WekaTypeConverters.java | 6 +-
.../apache/camel/component/weka/FilterTest.java | 76 +++++++++++-----------
.../apache/camel/component/weka/ReadWriteTest.java | 75 ++++++++++-----------
7 files changed, 120 insertions(+), 126 deletions(-)
diff --git a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaComponent.java b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaComponent.java
index 57405a1..acdecc7 100644
--- a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaComponent.java
+++ b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaComponent.java
@@ -32,10 +32,10 @@ public class WekaComponent extends DefaultComponent {
WekaConfiguration config = new WekaConfiguration();
WekaEndpoint endpoint = new WekaEndpoint(urispec, this, config);
setProperties(endpoint, params);
-
+
Command command = Command.valueOf(remaining);
config.setCommand(command);
-
+
return endpoint;
}
}
diff --git a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaConfiguration.java b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaConfiguration.java
index 39046b8..7aea74d 100644
--- a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaConfiguration.java
+++ b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaConfiguration.java
@@ -25,7 +25,7 @@ public class WekaConfiguration {
// Available commands
public enum Command {
- filter, read, write, version
+ filter, read, write, version
}
@UriPath(description = "The filter command")
@@ -38,7 +38,7 @@ public class WekaConfiguration {
private String path;
private Command command;
-
+
Command getCommand() {
return command;
}
diff --git a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaEndpoint.java b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaEndpoint.java
index 3685ae5..251cbd1 100644
--- a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaEndpoint.java
+++ b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaEndpoint.java
@@ -30,12 +30,11 @@ import weka.core.Version;
/**
* The camel-weka component provides Data Mining functionality through Weka.
*/
-@UriEndpoint(firstVersion = "3.1.0", scheme = "weka", title = "Weka",
- syntax = "weka:cmd", producerOnly = true, label = "Datamining")
+@UriEndpoint(firstVersion = "3.1.0", scheme = "weka", title = "Weka", syntax = "weka:cmd", producerOnly = true, label = "Datamining")
public class WekaEndpoint extends DefaultEndpoint {
static final Logger LOG = LoggerFactory.getLogger(WekaEndpoint.class);
-
+
@UriParam
private final WekaConfiguration configuration;
diff --git a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaProducer.java b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaProducer.java
index f8e9493..c99f5ac 100644
--- a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaProducer.java
+++ b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaProducer.java
@@ -61,7 +61,7 @@ public class WekaProducer extends DefaultProducer {
WekaEndpoint endpoint = getEndpoint();
Command cmd = getConfiguration().getCommand();
-
+
if (Command.version == cmd) {
Message msg = exchange.getMessage();
@@ -81,41 +81,41 @@ public class WekaProducer extends DefaultProducer {
Message msg = exchange.getMessage();
msg.setBody(handleFilterCmd(exchange));
-
+
}
}
private Dataset handleReadCmd(Exchange exchange) throws Exception {
-
+
String fpath = getConfiguration().getPath();
-
+
if (fpath != null) {
Dataset dataset = Dataset.create(fpath);
return dataset;
}
-
+
Dataset dataset = assertDatasetBody(exchange);
return dataset;
}
private Object handleWriteCmd(Exchange exchange) throws Exception {
-
+
Dataset dataset = assertDatasetBody(exchange);
String fpath = getConfiguration().getPath();
-
+
if (fpath != null) {
-
+
dataset.write(Paths.get(fpath));
return dataset;
-
+
} else {
-
- // The internal implementation of DataSink does this..
+
+ // The internal implementation of DataSink does this..
// Instances.toString().getBytes()
//
// Therefore, we avoid creating yet another copy of the
// instance data and call Instances.toString() as well
-
+
Instances instances = dataset.getInstances();
byte[] bytes = instances.toString().getBytes();
return new ByteArrayInputStream(bytes);
@@ -123,63 +123,63 @@ public class WekaProducer extends DefaultProducer {
}
private Dataset handleFilterCmd(Exchange exchange) throws Exception {
-
+
String applyValue = getConfiguration().getApply();
Dataset dataset = assertDatasetBody(exchange);
dataset = dataset.apply(applyValue);
-
+
return dataset;
}
private Dataset assertDatasetBody(Exchange exchange) throws Exception {
-
+
Message msg = exchange.getMessage();
Object body = msg.getBody();
-
+
Dataset dataset = msg.getBody(Dataset.class);
-
+
if (dataset == null) {
-
+
if (body instanceof Instances) {
- dataset = Dataset.create((Instances) body);
-
+ dataset = Dataset.create((Instances)body);
+
} else if (body instanceof GenericFile) {
-
- GenericFile<?> file = (GenericFile<?>) body;
+
+ GenericFile<?> file = (GenericFile<?>)body;
AssertState.isFalse(file.isDirectory(), "Directory not supported: " + file);
String absolutePath = file.getAbsoluteFilePath();
dataset = Dataset.create(absolutePath);
-
+
} else if (body instanceof URL) {
-
- URL url = (URL) body;
+
+ URL url = (URL)body;
Instances instances = readInternal(url.openStream());
dataset = Dataset.create(instances);
-
+
} else if (body instanceof InputStream) {
-
- InputStream input = (InputStream) body;
+
+ InputStream input = (InputStream)body;
Instances instances = readInternal(input);
dataset = Dataset.create(instances);
}
}
-
+
AssertState.notNull(dataset, "Cannot obtain dataset from body: " + body);
return dataset;
}
// https://github.com/tdiesler/nessus-weka/issues/11
private static Instances readInternal(InputStream input) {
-
+
Instances instances = null;
-
+
try {
-
+
if (input.markSupported())
input.mark(10240);
-
+
// First try .arff
try {
Loader loader = new ArffLoader();
@@ -195,13 +195,13 @@ public class WekaProducer extends DefaultProducer {
input.reset();
}
}
-
+
// Next try .csv
if (instances == null) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(baos));
-
+
try (BufferedReader br = new BufferedReader(new InputStreamReader(input))) {
String line = br.readLine();
while (line != null) {
@@ -215,17 +215,17 @@ public class WekaProducer extends DefaultProducer {
}
input = new ByteArrayInputStream(baos.toByteArray());
-
+
Loader loader = new CSVLoader();
loader.setSource(input);
loader.getStructure();
instances = loader.getDataSet();
}
-
+
} catch (Exception ex) {
throw UncheckedException.create(ex);
}
-
+
return instances;
}
}
diff --git a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaTypeConverters.java b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaTypeConverters.java
index 30f7ea1..4004209 100644
--- a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaTypeConverters.java
+++ b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaTypeConverters.java
@@ -28,14 +28,14 @@ import weka.core.Instances;
public class WekaTypeConverters {
@Converter
- public static InputStream toInputStream(Dataset dataset) {
+ public static InputStream toInputStream(Dataset dataset) {
Instances instances = dataset.getInstances();
return toInputStream(instances);
}
@Converter
- public static InputStream toInputStream(Instances instances) {
+ public static InputStream toInputStream(Instances instances) {
byte[] bytes = instances.toString().getBytes();
return new ByteArrayInputStream(bytes);
}
- }
+}
diff --git a/components/camel-weka/src/test/java/org/apache/camel/component/weka/FilterTest.java b/components/camel-weka/src/test/java/org/apache/camel/component/weka/FilterTest.java
index c0fe527..f48c48d 100644
--- a/components/camel-weka/src/test/java/org/apache/camel/component/weka/FilterTest.java
+++ b/components/camel-weka/src/test/java/org/apache/camel/component/weka/FilterTest.java
@@ -37,35 +37,35 @@ public class FilterTest {
public void readFromFileFilterAndWrite() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
-
+
@Override
public void configure() throws Exception {
-
+
// Use the file component to read the CSV file
from("file:src/test/resources/data?fileName=sfny.csv&noop=true")
-
- // Convert the 'in_sf' attribute to nominal
- .to("weka:filter?apply=NumericToNominal -R first")
-
- // Move the 'in_sf' attribute to the end
- .to("weka:filter?apply=Reorder -R 2-last,1")
-
- // Rename the relation
- .to("weka:filter?apply=RenameRelation -modify sfny")
-
- // Use the file component to write the Arff file
- .to("file:target/data?fileName=sfny.arff")
-
- .to("direct:end");
+
+ // Convert the 'in_sf' attribute to nominal
+ .to("weka:filter?apply=NumericToNominal -R first")
+
+ // Move the 'in_sf' attribute to the end
+ .to("weka:filter?apply=Reorder -R 2-last,1")
+
+ // Rename the relation
+ .to("weka:filter?apply=RenameRelation -modify sfny")
+
+ // Use the file component to write the Arff file
+ .to("file:target/data?fileName=sfny.arff")
+
+ .to("direct:end");
}
});
camelctx.start();
-
+
ConsumerTemplate consumer = camelctx.createConsumerTemplate();
consumer.receiveBody("direct:end");
-
+
Path inpath = Paths.get("target/data/sfny.arff");
Instances instances = DatasetUtils.read(inpath);
Assert.assertEquals("sfny", instances.relationName());
@@ -76,28 +76,28 @@ public class FilterTest {
public void readWithWekaFilterAndWrite() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
-
+
@Override
public void configure() throws Exception {
-
+
from("direct:start")
-
- // Use Weka to read the CSV file
- .to("weka:read?path=src/test/resources/data/sfny.csv")
-
- // Convert the 'in_sf' attribute to nominal
- .to("weka:filter?apply=NumericToNominal -R first")
-
- // Move the 'in_sf' attribute to the end
- .to("weka:filter?apply=Reorder -R 2-last,1")
-
- // Rename the relation
- .to("weka:filter?apply=RenameRelation -modify sfny")
-
- // Use Weka to write the Arff file
- .to("weka:write?path=target/data/sfny.arff");
+
+ // Use Weka to read the CSV file
+ .to("weka:read?path=src/test/resources/data/sfny.csv")
+
+ // Convert the 'in_sf' attribute to nominal
+ .to("weka:filter?apply=NumericToNominal -R first")
+
+ // Move the 'in_sf' attribute to the end
+ .to("weka:filter?apply=Reorder -R 2-last,1")
+
+ // Rename the relation
+ .to("weka:filter?apply=RenameRelation -modify sfny")
+
+ // Use Weka to write the Arff file
+ .to("weka:write?path=target/data/sfny.arff");
}
});
camelctx.start();
@@ -105,7 +105,7 @@ public class FilterTest {
ProducerTemplate producer = camelctx.createProducerTemplate();
Dataset dataset = producer.requestBody("direct:start", null, Dataset.class);
Assert.assertEquals("sfny", dataset.getInstances().relationName());
-
+
dataset = Dataset.create("target/data/sfny.arff");
Assert.assertEquals("sfny", dataset.getInstances().relationName());
}
diff --git a/components/camel-weka/src/test/java/org/apache/camel/component/weka/ReadWriteTest.java b/components/camel-weka/src/test/java/org/apache/camel/component/weka/ReadWriteTest.java
index d50159e..eb64406 100644
--- a/components/camel-weka/src/test/java/org/apache/camel/component/weka/ReadWriteTest.java
+++ b/components/camel-weka/src/test/java/org/apache/camel/component/weka/ReadWriteTest.java
@@ -37,7 +37,7 @@ public class ReadWriteTest {
public void wekaVersion() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
@@ -45,38 +45,37 @@ public class ReadWriteTest {
}
});
camelctx.start();
-
+
ProducerTemplate producer = camelctx.createProducerTemplate();
String res = producer.requestBody("direct:start", null, String.class);
Assert.assertTrue(res.startsWith("3.8"));
}
}
-
+
@Test
public void readCsvFile() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("file:src/test/resources/data?fileName=sfny.csv&noop=true")
- .to("weka:read").to("direct:end");
+ from("file:src/test/resources/data?fileName=sfny.csv&noop=true").to("weka:read").to("direct:end");
}
});
camelctx.start();
-
+
ConsumerTemplate consumer = camelctx.createConsumerTemplate();
Dataset dataset = consumer.receiveBody("direct:end", Dataset.class);
Assert.assertNotNull(dataset);
}
}
-
+
@Test
public void readCsvUrl() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
@@ -84,21 +83,21 @@ public class ReadWriteTest {
}
});
camelctx.start();
-
+
Path absPath = Paths.get("src/test/resources/data/sfny.csv").toAbsolutePath();
URL sourceUrl = absPath.toUri().toURL();
-
+
ProducerTemplate producer = camelctx.createProducerTemplate();
Dataset dataset = producer.requestBody("direct:start", sourceUrl, Dataset.class);
Assert.assertNotNull(dataset);
}
}
-
+
@Test
public void readCsvInputStream() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
@@ -106,10 +105,10 @@ public class ReadWriteTest {
}
});
camelctx.start();
-
+
Path absPath = Paths.get("src/test/resources/data/sfny.csv").toAbsolutePath();
InputStream input = absPath.toUri().toURL().openStream();
-
+
ProducerTemplate producer = camelctx.createProducerTemplate();
Dataset dataset = producer.requestBody("direct:start", input, Dataset.class);
Assert.assertNotNull(dataset);
@@ -120,7 +119,7 @@ public class ReadWriteTest {
public void readArffWithPath() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
@@ -128,7 +127,7 @@ public class ReadWriteTest {
}
});
camelctx.start();
-
+
ProducerTemplate producer = camelctx.createProducerTemplate();
Dataset dataset = producer.requestBody("direct:start", null, Dataset.class);
Assert.assertNotNull(dataset);
@@ -139,7 +138,7 @@ public class ReadWriteTest {
public void readArffInputStream() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
@@ -147,10 +146,10 @@ public class ReadWriteTest {
}
});
camelctx.start();
-
+
Path absPath = Paths.get("src/test/resources/data/sfny.arff").toAbsolutePath();
InputStream input = absPath.toUri().toURL().openStream();
-
+
ProducerTemplate producer = camelctx.createProducerTemplate();
Dataset dataset = producer.requestBody("direct:start", input, Dataset.class);
Assert.assertNotNull(dataset);
@@ -161,22 +160,21 @@ public class ReadWriteTest {
public void writeDatasetWithConversion() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start")
- .to("file:target/data?fileName=sfny.arff");
+ from("direct:start").to("file:target/data?fileName=sfny.arff");
}
});
camelctx.start();
-
+
Path inpath = Paths.get("src/test/resources/data/sfny.arff");
Dataset dataset = Dataset.create(inpath);
-
+
ProducerTemplate producer = camelctx.createProducerTemplate();
producer.sendBody("direct:start", dataset);
-
+
Path outpath = Paths.get("target/data/sfny.arff");
dataset = Dataset.create(outpath);
Assert.assertNotNull(dataset);
@@ -187,23 +185,21 @@ public class ReadWriteTest {
public void writeDatasetWithoutPath() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start")
- .to("weka:write")
- .to("file:target/data?fileName=sfny.arff");
+ from("direct:start").to("weka:write").to("file:target/data?fileName=sfny.arff");
}
});
camelctx.start();
-
+
Path inpath = Paths.get("src/test/resources/data/sfny.arff");
Dataset dataset = Dataset.create(inpath);
-
+
ProducerTemplate producer = camelctx.createProducerTemplate();
producer.sendBody("direct:start", dataset);
-
+
Path outpath = Paths.get("target/data/sfny.arff");
dataset = Dataset.create(outpath);
Assert.assertNotNull(dataset);
@@ -214,26 +210,25 @@ public class ReadWriteTest {
public void writeDatasetWithPath() throws Exception {
try (CamelContext camelctx = new DefaultCamelContext()) {
-
+
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start")
- .to("weka:write?path=target/data/sfny.arff");
+ from("direct:start").to("weka:write?path=target/data/sfny.arff");
}
});
camelctx.start();
-
+
Path inpath = Paths.get("src/test/resources/data/sfny.arff");
Dataset dataset = Dataset.create(inpath);
-
+
ProducerTemplate producer = camelctx.createProducerTemplate();
producer.sendBody("direct:start", dataset);
-
+
Path outpath = Paths.get("target/data/sfny.arff");
dataset = Dataset.create(outpath);
Assert.assertNotNull(dataset);
}
}
-
+
}