You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/03 15:29:49 UTC

[camel] branch master updated: [CAMEL-14468] Add support for classification in camel-weka (#3536)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new cf4f5ae  [CAMEL-14468] Add support for classification in camel-weka (#3536)
cf4f5ae is described below

commit cf4f5ae6a798dd86c93b9369f1246076db954a51
Author: Thomas Diesler <td...@redhat.com>
AuthorDate: Mon Feb 3 16:29:40 2020 +0100

    [CAMEL-14468] Add support for classification in camel-weka (#3536)
---
 .../camel-weka/src/main/docs/weka-component.adoc   | 148 +++++++-
 .../camel/component/weka/WekaConfiguration.java    |  98 ++++-
 .../apache/camel/component/weka/WekaEndpoint.java  |  19 +-
 .../apache/camel/component/weka/WekaProducer.java  | 270 +++++++-------
 .../camel/component/weka/WekaTypeConverters.java   |  36 +-
 .../camel/component/weka/DecisionTreeTest.java     | 142 ++++++++
 .../apache/camel/component/weka/FilterTest.java    |  40 +-
 .../camel/component/weka/PredictionTest.java       | 111 ++++++
 .../apache/camel/component/weka/ReadWriteTest.java |  30 +-
 .../src/test/resources/data/sfny-j48.model         | Bin 0 -> 7824 bytes
 .../src/test/resources/data/sfny-predicted.arff    | 111 ++++++
 .../src/test/resources/data/sfny-test.arff         | 111 ++++++
 .../src/test/resources/data/sfny-train.arff        | 405 +++++++++++++++++++++
 .../endpoint/dsl/WekaEndpointBuilderFactory.java   | 174 ++++++++-
 .../modules/ROOT/pages/weka-component.adoc         | 124 ++++++-
 parent/pom.xml                                     |   2 +-
 16 files changed, 1610 insertions(+), 211 deletions(-)

diff --git a/components/camel-weka/src/main/docs/weka-component.adoc b/components/camel-weka/src/main/docs/weka-component.adoc
index 6f30870..9425507 100644
--- a/components/camel-weka/src/main/docs/weka-component.adoc
+++ b/components/camel-weka/src/main/docs/weka-component.adoc
@@ -50,33 +50,45 @@ The Weka component supports 2 options, which are listed below.
 The Weka endpoint is configured using URI syntax:
 
 ----
-weka:cmd
+weka:cmd?options
 ----
 
 with the following path and query parameters:
 
-=== Path Parameters (2 parameters):
+=== Path Parameters (7 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *read* | The read command |  | String
+| *write* | The write command |  | String
 | *filter* | The filter command |  | String
+| *model* | The model command |  | String
+| *push* | The push command |  | String
+| *pop* | The pop command |  | String
 | *version* | The version command |  | String
 |===
 
 
-=== Query Parameters (5 parameters):
+=== Query Parameters (12 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *apply* (producer) | The Weka filter/classifier spec (i.e. Name Options) |  | String
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
-| *path* (producer) | An optional in/out path for the read/write commands |  | String
+| *path* (producer) | An in/out path for the read/write commands |  | String
 | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *apply* (filter) | The filter spec (i.e. Name Options) |  | String
+| *build* (model) | The classifier spec (i.e. Name Options) |  | String
+| *dsname* (model) | The named dataset to train the classifier with |  | String
+| *folds* (model) | Numer of folds to use for cross-validation | 10 | int
+| *loadFrom* (model) | Path to load the model from |  | String
+| *saveTo* (model) | Path to save the model to |  | String
+| *seed* (model) | An optional seed for the randomizer | 1 | int
+| *xval* (model) | Flag on whether to use cross-validation with the current dataset | false | boolean
 |===
 // endpoint options: END
 // spring-boot-auto-configure options: START
@@ -118,9 +130,9 @@ This component is not supported in Karaf
 
 == Samples
 
-This first example shows how to read a CSV file with the file component and then 
-pass it on to Weka. In Weka we apply a few filters to the data set and then pass it on to
-the file component for writing. 
+=== Read + Filter + Write
+
+This first example shows how to read a CSV file with the file component and then pass it on to Weka. In Weka we apply a few filters to the data set and then pass it on to the file component for writing. 
 
 [source,java]
 ----
@@ -128,7 +140,7 @@ the file component for writing.
     public void configure() throws Exception {
         
         // Use the file component to read the CSV file
-        from("file:src/test/resources/data?fileName=sfny.csv&noop=true")
+        from("file:src/test/resources/data?fileName=sfny.csv")
         
         // Convert the 'in_sf' attribute to nominal
         .to("weka:filter?apply=NumericToNominal -R first")
@@ -141,19 +153,18 @@ the file component for writing.
         
         // Use the file component to write the Arff file
         .to("file:target/data?fileName=sfny.arff")
-        
-        .to("direct:end");
     }
 ----
 
-The second example does the same as above without use of the file component.
+Here we do the same as above without use of the file component.
 
 [source,java]
 ----
     @Override
     public void configure() throws Exception {
         
-        from("direct:start")
+        // Initiate the route from somewhere
+        .from("...")
         
         // Use Weka to read the CSV file
         .to("weka:read?path=src/test/resources/data/sfny.csv")
@@ -171,3 +182,114 @@ The second example does the same as above without use of the file component.
         .to("weka:write?path=target/data/sfny.arff");
     }
 ----
+
+In this example, would the client provide the input path or some other supported type.
+Have a look at the  `WekaTypeConverters` for the set of supported input types.
+
+[source,java]
+----
+    @Override
+    public void configure() throws Exception {
+        
+        // Initiate the route from somewhere
+        .from("...")
+        
+        // Convert the 'in_sf' attribute to nominal
+        .to("weka:filter?apply=NumericToNominal -R first")
+        
+        // Move the 'in_sf' attribute to the end
+        .to("weka:filter?apply=Reorder -R 2-last,1")
+        
+        // Rename the relation
+        .to("weka:filter?apply=RenameRelation -modify sfny")
+        
+        // Use Weka to write the Arff file
+        .to("weka:write?path=target/data/sfny.arff");
+    }
+----
+
+=== Building a Model
+
+When building a model, we first choose the classification algorithm to use and then train it with some data. The result is the trained model that we can later use to classify unseen data.
+
+Here we train J48 with 10 fold cross-validation.  
+
+[source,java]
+----
+try (CamelContext camelctx = new DefaultCamelContext()) {
+    
+    camelctx.addRoutes(new RouteBuilder() {
+        
+        @Override
+        public void configure() throws Exception {
+            
+            // Use the file component to read the training data
+            from("file:src/test/resources/data?fileName=sfny-train.arff")
+            
+            // Build a J48 classifier using cross-validation with 10 folds
+            .to("weka:model?build=J48&xval=true&folds=10&seed=1")
+                    
+            // Persist the J48 model
+            .to("weka:model?saveTo=src/test/resources/data/sfny-j48.model")
+        }
+    });
+    camelctx.start();
+}
+----
+
+=== Predicting a Class
+
+Here we use a `Processor` to access functionality that is not directly available from endpoint URIs.
+
+In case you come here directly and this syntax looks a bit overwhelming, you might want to have a brief look at the section about https://tdiesler.github.io/nessus-weka/#_nessus_api_concepts[Nessus API Concepts].  
+
+[source,java]
+----
+try (CamelContext camelctx = new DefaultCamelContext()) {
+    
+    camelctx.addRoutes(new RouteBuilder() {
+        
+        @Override
+        public void configure() throws Exception {
+            
+            // Use the file component to read the test data
+            from("file:src/test/resources/data?fileName=sfny-test.arff")
+            
+            // Remove the class attribute 
+            .to("weka:filter?apply=Remove -R last")
+            
+            // Add the 'prediction' placeholder attribute 
+            .to("weka:filter?apply=Add -N predicted -T NOM -L 0,1")
+            
+            // Rename the relation 
+            .to("weka:filter?apply=RenameRelation -modify sfny-predicted")
+            
+            // Load an already existing model
+            .to("weka:model?loadFrom=src/test/resources/data/sfny-j48.model")
+            
+            // Use a processor to do the prediction
+            .process(new Processor() {
+                public void process(Exchange exchange) throws Exception {
+                    Dataset dataset = exchange.getMessage().getBody(Dataset.class);
+                    dataset.applyToInstances(new NominalPredictor());
+                }
+            })
+                    
+            // Write the data file
+            .to("weka:write?path=src/test/resources/data/sfny-predicted.arff")
+        }
+    });
+    camelctx.start();
+}
+----
+
+== Resources
+
+* https://www.cs.waikato.ac.nz/ml/weka/book.html[Practical Machine Learning Tools and Techniques,window=_blank]
+* https://www.cs.waikato.ac.nz/ml/weka/courses.html[Machine Learning Courses,window=_blank]
+* https://waikato.github.io/weka-wiki/documentation/[Weka Documentation,window=_blank]
+* https://tdiesler.github.io/nessus-weka[Nessus-Weka,window=_blank]
+
+
+
+
diff --git a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaConfiguration.java b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaConfiguration.java
index 7aea74d..20ffd0b 100644
--- a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaConfiguration.java
+++ b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaConfiguration.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.weka;
 
+import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
@@ -25,18 +26,50 @@ public class WekaConfiguration {
 
     // Available commands
     public enum Command {
-        filter, read, write, version
+        filter, model, read, write, push, pop, version 
     }
 
+    // Not used, only here for documentation
+    
+    @UriPath(description = "The read command")
+    private String read;
+    @UriPath(description = "The write command")
+    private String write;
     @UriPath(description = "The filter command")
     private String filter;
+    @UriPath(description = "The model command")
+    private String model;
+    @UriPath(description = "The push command")
+    private String push;
+    @UriPath(description = "The pop command")
+    private String pop;
     @UriPath(description = "The version command")
     private String version;
-    @UriParam(description = "The Weka filter/classifier spec (i.e. Name [Options])")
-    private String apply;
-    @UriParam(description = "An optional in/out path for the read/write commands")
+    
+    // Read/Write parameters
+    @UriParam(description = "An in/out path for the read/write commands")
     private String path;
 
+    // Filter parameters
+    @UriParam(description = "The filter spec (i.e. Name [Options])", label = "filter")
+    private String apply;
+    
+    // Model parameters
+    @UriParam(description = "The classifier spec (i.e. Name [Options])", label = "model")
+    private String build;
+    @UriParam(description = "Flag on whether to use cross-validation with the current dataset", label = "model")
+    private boolean xval;
+    @UriParam(description = "The named dataset to train the classifier with", label = "model")
+    private String dsname;
+    @UriParam(description = "Numer of folds to use for cross-validation", label = "model") @Metadata(defaultValue = "10")
+    private int folds = 10;
+    @UriParam(description = "An optional seed for the randomizer", label = "model") @Metadata(defaultValue = "1")
+    private int seed = 1;
+    @UriParam(description = "Path to save the model to", label = "model")
+    private String saveTo;
+    @UriParam(description = "Path to load the model from", label = "model")
+    private String loadFrom;
+    
     private Command command;
 
     Command getCommand() {
@@ -55,6 +88,14 @@ public class WekaConfiguration {
         this.apply = apply;
     }
 
+    String getBuild() {
+        return build;
+    }
+
+    void setBuild(String build) {
+        this.build = build;
+    }
+
     String getPath() {
         return path;
     }
@@ -62,4 +103,53 @@ public class WekaConfiguration {
     void setPath(String path) {
         this.path = path;
     }
+
+    boolean isXval() {
+        return xval;
+    }
+
+    void setXval(boolean xval) {
+        this.xval = xval;
+    }
+
+    int getFolds() {
+        return folds;
+    }
+
+    void setFolds(int folds) {
+        this.folds = folds;
+    }
+
+    int getSeed() {
+        return seed;
+    }
+
+    void setSeed(int seed) {
+        this.seed = seed;
+    }
+
+    String getSaveTo() {
+        return saveTo;
+    }
+
+    void setSaveTo(String saveTo) {
+        this.saveTo = saveTo;
+    }
+
+    String getLoadFrom() {
+        return loadFrom;
+    }
+
+    void setLoadFrom(String loadFrom) {
+        this.loadFrom = loadFrom;
+    }
+
+    String getDsname() {
+        return dsname;
+    }
+
+    void setDsname(String dsname) {
+        this.dsname = dsname;
+    }
+
 }
diff --git a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaEndpoint.java b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaEndpoint.java
index bd6e995..da08917 100644
--- a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaEndpoint.java
+++ b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaEndpoint.java
@@ -22,18 +22,14 @@ import org.apache.camel.Producer;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.DefaultEndpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import weka.core.Version;
 
 /**
  * The camel-weka component provides Data Mining functionality through Weka.
  */
-@UriEndpoint(firstVersion = "3.1.0", scheme = "weka", title = "Weka", syntax = "weka:cmd", producerOnly = true, label = "Datamining")
+@UriEndpoint(firstVersion = "3.1.0", scheme = "weka", title = "Weka", syntax = "weka:cmd?options", producerOnly = true, label = "Datamining")
 public class WekaEndpoint extends DefaultEndpoint {
 
-    static final Logger LOG = LoggerFactory.getLogger(WekaEndpoint.class);
-
     @UriParam
     private final WekaConfiguration configuration;
 
@@ -42,13 +38,17 @@ public class WekaEndpoint extends DefaultEndpoint {
         this.configuration = config;
     }
 
+    public WekaConfiguration getConfiguration() {
+        return configuration;
+    }
+
     @Override
     public WekaComponent getComponent() {
-        return (WekaComponent)super.getComponent();
+        return (WekaComponent) super.getComponent();
     }
 
     @Override
-    public Consumer createConsumer(Processor processor) throws Exception {
+    public Consumer createConsumer(Processor processor) {
         throw new UnsupportedOperationException();
     }
 
@@ -57,8 +57,9 @@ public class WekaEndpoint extends DefaultEndpoint {
         return new WekaProducer(this);
     }
 
-    public WekaConfiguration getConfiguration() {
-        return configuration;
+    @Override
+    public boolean isSingleton() {
+        return false;
     }
 
     String wekaVersion() {
diff --git a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaProducer.java b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaProducer.java
index e6962bf..11edc1f 100644
--- a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaProducer.java
+++ b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaProducer.java
@@ -16,34 +16,34 @@
  */
 package org.apache.camel.component.weka;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.net.URL;
 import java.nio.file.Paths;
 
 import io.nessus.weka.AssertState;
 import io.nessus.weka.Dataset;
-import io.nessus.weka.UncheckedException;
+import io.nessus.weka.ModelLoader;
+import io.nessus.weka.ModelPersister;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.weka.WekaConfiguration.Command;
 import org.apache.camel.support.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import weka.classifiers.Classifier;
+import weka.classifiers.Evaluation;
 import weka.core.Instances;
-import weka.core.converters.ArffLoader;
-import weka.core.converters.CSVLoader;
-import weka.core.converters.Loader;
 
+@SuppressWarnings("checkstyle:rightcurly")
 public class WekaProducer extends DefaultProducer {
 
+    static final Logger LOG = LoggerFactory.getLogger(WekaProducer.class);
+
     public WekaProducer(WekaEndpoint endpoint) {
         super(endpoint);
+
+        // All commands are supported on the producer
+        Command cmd = getConfiguration().getCommand();
+        AssertState.notNull(cmd, "Null command");
     }
 
     @Override
@@ -68,163 +68,189 @@ public class WekaProducer extends DefaultProducer {
 
         } else if (Command.read == cmd) {
 
-            Message msg = exchange.getMessage();
-            msg.setBody(handleReadCmd(exchange));
+            Dataset dataset = handleReadCmd(exchange);
+            exchange.getMessage().setBody(dataset);
 
         } else if (Command.write == cmd) {
 
-            Message msg = exchange.getMessage();
-            msg.setBody(handleWriteCmd(exchange));
+            Object result = handleWriteCmd(exchange);
+            exchange.getMessage().setBody(result);
 
         } else if (Command.filter == cmd) {
 
-            Message msg = exchange.getMessage();
-            msg.setBody(handleFilterCmd(exchange));
+            Dataset dataset = handleFilterCmd(exchange);
+            exchange.getMessage().setBody(dataset);
+            
+        } else if (Command.model == cmd) {
+
+            Dataset dataset = handleModelCmd(exchange);
+            exchange.getMessage().setBody(dataset);
+            
+        } else if (Command.push == cmd) {
 
+            Dataset dataset = handlePushCmd(exchange);
+            exchange.getMessage().setBody(dataset);
+            
+        } else if (Command.pop == cmd) {
+
+            Dataset dataset = handlePopCmd(exchange);
+            exchange.getMessage().setBody(dataset);
+            
+        } else {
+            
+            // Not really needed here, because all commands are supported 
+            throw new UnsupportedOperationException("Unsupported on Producer: " + cmd);
         }
     }
 
-    private Dataset handleReadCmd(Exchange exchange) throws Exception {
+    Dataset handlePushCmd(Exchange exchange) throws Exception {
+        
+        String dsname = getConfiguration().getDsname();
 
-        String fpath = getConfiguration().getPath();
+        Dataset dataset = assertDatasetBody(exchange);
+        if (dsname != null) {
+            dataset.push(dsname);
+        } else {
+            dataset.push();
+        }
+        
+        return dataset;
+    }
+
+    Dataset handlePopCmd(Exchange exchange) throws Exception {
+        
+        String dsname = getConfiguration().getDsname();
 
+        Dataset dataset = assertDatasetBody(exchange);
+        if (dsname != null) {
+            dataset.pop(dsname);
+        } else {
+            dataset.pop();
+        }
+        
+        return dataset;
+    }
+
+    Dataset handleReadCmd(Exchange exchange) throws Exception {
+        
+        String fpath = getConfiguration().getPath();
+        
         if (fpath != null) {
             Dataset dataset = Dataset.create(fpath);
             return dataset;
         }
-
+        
         Dataset dataset = assertDatasetBody(exchange);
         return dataset;
     }
 
-    private Object handleWriteCmd(Exchange exchange) throws Exception {
-
+    Object handleWriteCmd(Exchange exchange) throws Exception {
+        
         Dataset dataset = assertDatasetBody(exchange);
         String fpath = getConfiguration().getPath();
-
+        
         if (fpath != null) {
-
+            
             dataset.write(Paths.get(fpath));
             return dataset;
-
+            
         } else {
-
-            // The internal implementation of DataSink does this..
+            
+            // The internal implementation of DataSink does this.. 
             // Instances.toString().getBytes()
             //
             // Therefore, we avoid creating yet another copy of the
             // instance data and call Instances.toString() as well
-
+            
             Instances instances = dataset.getInstances();
             byte[] bytes = instances.toString().getBytes();
             return new ByteArrayInputStream(bytes);
         }
     }
 
-    private Dataset handleFilterCmd(Exchange exchange) throws Exception {
-
+    Dataset handleFilterCmd(Exchange exchange) throws Exception {
+        
         String applyValue = getConfiguration().getApply();
 
         Dataset dataset = assertDatasetBody(exchange);
         dataset = dataset.apply(applyValue);
-
+        
         return dataset;
     }
 
-    private Dataset assertDatasetBody(Exchange exchange) throws Exception {
-
-        Message msg = exchange.getMessage();
-        Object body = msg.getBody();
-
-        Dataset dataset = msg.getBody(Dataset.class);
-
-        if (dataset == null) {
-
-            if (body instanceof Instances) {
-
-                dataset = Dataset.create((Instances)body);
-
-            } else if (body instanceof GenericFile) {
-
-                GenericFile<?> file = (GenericFile<?>)body;
-                AssertState.isFalse(file.isDirectory(), "Directory not supported: " + file);
-                String absolutePath = file.getAbsoluteFilePath();
-                dataset = Dataset.create(absolutePath);
-
-            } else if (body instanceof URL) {
-
-                URL url = (URL)body;
-                Instances instances = readInternal(url.openStream());
-                dataset = Dataset.create(instances);
-
-            } else if (body instanceof InputStream) {
-
-                InputStream input = (InputStream)body;
-                Instances instances = readInternal(input);
-                dataset = Dataset.create(instances);
-            }
+    Dataset handleModelCmd(Exchange exchange) throws Exception {
+        
+        Dataset dataset = assertDatasetBody(exchange);
+        
+        String dsname = getConfiguration().getDsname();
+        boolean crossValidate = getConfiguration().isXval();
+        String buildSpec = getConfiguration().getBuild();
+        String loadFrom = getConfiguration().getLoadFrom();
+        String saveTo = getConfiguration().getSaveTo();
+        
+        // Load the Model
+        
+        if (loadFrom != null) {
+            
+            Classifier cl = dataset
+                    .loadClassifier(new ModelLoader(loadFrom))
+                    .getClassifier();
+            
+            AssertState.notNull(cl, "Cannot load the classifier from: " + loadFrom);
+            LOG.debug("{}", cl);
         }
-
-        AssertState.notNull(dataset, "Cannot obtain dataset from body: " + body);
-        return dataset;
-    }
-
-    // https://github.com/tdiesler/nessus-weka/issues/11
-    private static Instances readInternal(InputStream input) {
-
-        Instances instances = null;
-
-        try {
-
-            if (input.markSupported()) {
-                input.mark(10240);
+        
+        // Build a classifier
+        
+        else if (buildSpec != null) {
+            
+            dataset.buildClassifier(buildSpec);
+            
+            // Cross Validate the Model
+            
+            if (crossValidate) {
+                int seed = getConfiguration().getSeed();
+                int folds = getConfiguration().getFolds();
+                dataset.crossValidateModel(folds, seed);
             }
-            // First try .arff
-            try {
-                Loader loader = new ArffLoader();
-                loader.setSource(input);
-                loader.getStructure();
-                instances = loader.getDataSet();
-            } catch (IOException ex) {
-                String exmsg = ex.getMessage();
-                if (!exmsg.contains("Unable to determine structure as arff")) {
-                    throw ex;
+            
+            // Validate the Model using explicit/current instances
+            
+            else {
+                
+                // Use the named data set training
+                if (dsname != null) {
+                    dataset.pop(dsname);
                 }
-                if (input.markSupported()) {
-                    input.reset();
-                }
-            }
-
-            // Next try .csv
-            if (instances == null) {
-
-                ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(baos));
-
-                try (BufferedReader br = new BufferedReader(new InputStreamReader(input))) {
-                    String line = br.readLine();
-                    while (line != null) {
-                        if (!line.startsWith("#")) {
-                            bw.write(line);
-                            bw.newLine();
-                        }
-                        line = br.readLine();
-                    }
-                    bw.flush();
-                }
-
-                input = new ByteArrayInputStream(baos.toByteArray());
-
-                Loader loader = new CSVLoader();
-                loader.setSource(input);
-                loader.getStructure();
-                instances = loader.getDataSet();
+                
+                // Train with current instances
+                dataset.evaluateModel();
             }
-
-        } catch (Exception ex) {
-            throw UncheckedException.create(ex);
+            
+            Classifier cl = dataset.getClassifier();
+            AssertState.notNull(cl, "Model command requires 'load' or 'apply'");
+            LOG.debug("{}", cl);
+            
+            Evaluation ev = dataset.getEvaluation();
+            LOG.debug("{}", ev.toSummaryString());
+        }
+        
+        // Save the Model
+        
+        if (saveTo != null) {
+            dataset.consumeClassifier(new ModelPersister(saveTo));
         }
+        
+        return dataset;
+    }
 
-        return instances;
+    private Dataset assertDatasetBody(Exchange exchange) throws Exception {
+        
+        Message msg = exchange.getMessage();
+        Dataset dataset = msg.getBody(Dataset.class);
+        
+        AssertState.notNull(dataset, "Cannot obtain dataset from body: " + msg.getBody());
+        
+        return dataset;
     }
 }
diff --git a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaTypeConverters.java b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaTypeConverters.java
index c3548cf..1f51c59 100644
--- a/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaTypeConverters.java
+++ b/components/camel-weka/src/main/java/org/apache/camel/component/weka/WekaTypeConverters.java
@@ -17,18 +17,21 @@
 package org.apache.camel.component.weka;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Path;
 
 import io.nessus.weka.Dataset;
 import org.apache.camel.Converter;
 import weka.core.Instances;
 
-@Converter
+@Converter(generateLoader = true)
 public final class WekaTypeConverters {
-    
+
     private WekaTypeConverters() {
     }
-
+    
     @Converter
     public static InputStream toInputStream(Dataset dataset) {
         Instances instances = dataset.getInstances();
@@ -40,4 +43,29 @@ public final class WekaTypeConverters {
         byte[] bytes = instances.toString().getBytes();
         return new ByteArrayInputStream(bytes);
     }
-}
+
+    @Converter
+    public static Dataset toDataset(Instances instances) {
+        return Dataset.create(instances);
+    }
+
+    @Converter
+    public static Dataset toDataset(InputStream input) {
+        return Dataset.create(input);
+    }
+
+    @Converter
+    public static Dataset toDataset(File infile) {
+        return Dataset.create(infile.toPath());
+    }
+
+    @Converter
+    public static Dataset toDataset(Path inpath) {
+        return Dataset.create(inpath);
+    }
+
+    @Converter
+    public static Dataset toDataset(URL url) {
+        return Dataset.create(url);
+    }
+}
\ No newline at end of file
diff --git a/components/camel-weka/src/test/java/org/apache/camel/component/weka/DecisionTreeTest.java b/components/camel-weka/src/test/java/org/apache/camel/component/weka/DecisionTreeTest.java
new file mode 100644
index 0000000..ee79c22
--- /dev/null
+++ b/components/camel-weka/src/test/java/org/apache/camel/component/weka/DecisionTreeTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.weka;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import io.nessus.weka.Dataset;
+import io.nessus.weka.testing.AbstractWekaTest;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Assert;
+import org.junit.Test;
+import weka.classifiers.Classifier;
+import weka.classifiers.Evaluation;
+
+public class DecisionTreeTest extends AbstractWekaTest {
+    
+    @Test
+    public void testJ48WithCrossValidation() throws Exception {
+        
+        try (CamelContext camelctx = new DefaultCamelContext()) {
+            
+            camelctx.addRoutes(new RouteBuilder() {
+                
+                @Override
+                public void configure() throws Exception {
+                    
+                        // Use the weka to read the model training data
+                        from("direct:start")
+                        
+                        // Build a J48 classifier using cross-validation with 10 folds
+                        .to("weka:model?build=J48&xval=true&folds=10&seed=1")
+                        
+                        // Persist the J48 model
+                        .to("weka:model?saveTo=src/test/resources/data/sfny-j48.model");
+                }
+            });
+            camelctx.start();
+            
+            Path inpath = Paths.get("src/test/resources/data/sfny-train.arff");
+            
+            ProducerTemplate producer = camelctx.createProducerTemplate();
+            Dataset dataset = producer.requestBody("direct:start", inpath, Dataset.class);
+            
+            Classifier classifier = dataset.getClassifier();
+            Assert.assertNotNull(classifier);
+            logInfo("{}", classifier);
+            
+            Evaluation evaluation = dataset.getEvaluation();
+            Assert.assertNotNull(evaluation);
+            logInfo("{}", evaluation);
+        }
+    }
+    
+    @Test
+    public void testJ48WithTrainingData() throws Exception {
+        
+        try (CamelContext camelctx = new DefaultCamelContext()) {
+            
+            camelctx.addRoutes(new RouteBuilder() {
+                
+                @Override
+                public void configure() throws Exception {
+                    
+                        // Use the weka to read the model training data
+                        from("direct:start")
+                        
+                        // Push the current instances to the stack
+                        .to("weka:push?dsname=sfny-train")
+                        
+                        // Build a J48 classifier with a set of named instances
+                        .to("weka:model?build=J48&dsname=sfny-train");
+                }
+            });
+            camelctx.start();
+            
+            Path inpath = Paths.get("src/test/resources/data/sfny-train.arff");
+            
+            ProducerTemplate producer = camelctx.createProducerTemplate();
+            Dataset dataset = producer.requestBody("direct:start", inpath, Dataset.class);
+            
+            Classifier classifier = dataset.getClassifier();
+            Assert.assertNotNull(classifier);
+            logInfo("{}", classifier);
+            
+            Evaluation evaluation = dataset.getEvaluation();
+            Assert.assertNotNull(evaluation);
+            logInfo("{}", evaluation);
+        }
+    }
+    
+    @Test
+    public void testJ48WithCurrentInstances() throws Exception {
+        
+        try (CamelContext camelctx = new DefaultCamelContext()) {
+            
+            camelctx.addRoutes(new RouteBuilder() {
+                
+                @Override
+                public void configure() throws Exception {
+                    
+                        // Use the weka to read the model training data
+                        from("direct:start")
+                        
+                        // Build a J48 classifier using the current dataset
+                        .to("weka:model?build=J48");
+                }
+            });
+            camelctx.start();
+            
+            Path inpath = Paths.get("src/test/resources/data/sfny-train.arff");
+            
+            ProducerTemplate producer = camelctx.createProducerTemplate();
+            Dataset dataset = producer.requestBody("direct:start", inpath, Dataset.class);
+            
+            Classifier classifier = dataset.getClassifier();
+            Assert.assertNotNull(classifier);
+            logInfo("{}", classifier);
+            
+            Evaluation evaluation = dataset.getEvaluation();
+            Assert.assertNotNull(evaluation);
+            logInfo("{}", evaluation);
+        }
+    }
+}
diff --git a/components/camel-weka/src/test/java/org/apache/camel/component/weka/FilterTest.java b/components/camel-weka/src/test/java/org/apache/camel/component/weka/FilterTest.java
index 2896bd6..5275dc5 100644
--- a/components/camel-weka/src/test/java/org/apache/camel/component/weka/FilterTest.java
+++ b/components/camel-weka/src/test/java/org/apache/camel/component/weka/FilterTest.java
@@ -42,8 +42,8 @@ public class FilterTest {
                 @Override
                 public void configure() throws Exception {
 
-                    // Use the file component to read the CSV file
-                    from("file:src/test/resources/data?fileName=sfny.csv&noop=true")
+                        // Use the file component to read the CSV file
+                        from("file:src/test/resources/data?fileName=sfny.csv&noop=true")
 
                         // Convert the 'in_sf' attribute to nominal
                         .to("weka:filter?apply=NumericToNominal -R first")
@@ -63,10 +63,10 @@ public class FilterTest {
             camelctx.start();
 
             ConsumerTemplate consumer = camelctx.createConsumerTemplate();
-            consumer.receiveBody("direct:end");
-
-            Path inpath = Paths.get("target/data/sfny.arff");
-            Instances instances = DatasetUtils.read(inpath);
+            Dataset dataset = consumer.receiveBody("direct:end", Dataset.class);
+            Assert.assertEquals("sfny", dataset.getInstances().relationName());
+            
+            Instances instances = DatasetUtils.read("target/data/sfny.arff");
             Assert.assertEquals("sfny", instances.relationName());
         }
     }
@@ -80,33 +80,33 @@ public class FilterTest {
 
                 @Override
                 public void configure() throws Exception {
-
-                    from("direct:start")
-
-                        // Use Weka to read the CSV file
-                        .to("weka:read?path=src/test/resources/data/sfny.csv")
-
+                    
+                        // Use weka to read the CSV file
+                        from("direct:start")
+                    
                         // Convert the 'in_sf' attribute to nominal
                         .to("weka:filter?apply=NumericToNominal -R first")
-
+                        
                         // Move the 'in_sf' attribute to the end
                         .to("weka:filter?apply=Reorder -R 2-last,1")
-
+                        
                         // Rename the relation
                         .to("weka:filter?apply=RenameRelation -modify sfny")
-
-                        // Use Weka to write the Arff file
+                        
+                        // Use weka to write the Arff file
                         .to("weka:write?path=target/data/sfny.arff");
                 }
             });
             camelctx.start();
 
-            ProducerTemplate producer = camelctx.createProducerTemplate();
-            Dataset dataset = producer.requestBody("direct:start", null, Dataset.class);
-            Assert.assertEquals("sfny", dataset.getInstances().relationName());
+            Path inpath = Paths.get("src/test/resources/data/sfny.csv");
 
-            dataset = Dataset.create("target/data/sfny.arff");
+            ProducerTemplate producer = camelctx.createProducerTemplate();
+            Dataset dataset = producer.requestBody("direct:start", inpath, Dataset.class);
             Assert.assertEquals("sfny", dataset.getInstances().relationName());
+            
+            Instances instances = DatasetUtils.read("target/data/sfny.arff");
+            Assert.assertEquals("sfny", instances.relationName());
         }
     }
 }
diff --git a/components/camel-weka/src/test/java/org/apache/camel/component/weka/PredictionTest.java b/components/camel-weka/src/test/java/org/apache/camel/component/weka/PredictionTest.java
new file mode 100644
index 0000000..05efbfe
--- /dev/null
+++ b/components/camel-weka/src/test/java/org/apache/camel/component/weka/PredictionTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.weka;
+
+import io.nessus.weka.AssertArg;
+import io.nessus.weka.Dataset;
+import io.nessus.weka.NominalPredictor;
+import io.nessus.weka.testing.AbstractWekaTest;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Assert;
+import org.junit.Test;
+import weka.core.Instances;
+
+public class PredictionTest extends AbstractWekaTest {
+    
+    @Test
+    public void testJ48() throws Exception {
+        
+        try (CamelContext camelctx = new DefaultCamelContext()) {
+            
+            camelctx.addRoutes(new RouteBuilder() {
+                
+                @Override
+                public void configure() throws Exception {
+                    
+                        // Use the file component to read the CSV file
+                        from("file:src/test/resources/data?fileName=sfny-test.arff&noop=true")
+                        
+                        // Push these instances for later use
+                        .to("weka:push?dsname=sfny-test")
+                        
+                        // Remove the class attribute 
+                        .to("weka:filter?apply=Remove -R last")
+                        
+                        // Add the 'prediction' placeholder attribute 
+                        .to("weka:filter?apply=Add -N predicted -T NOM -L 0,1")
+                        
+                        // Rename the relation 
+                        .to("weka:filter?apply=RenameRelation -modify sfny-predicted")
+                        
+                        // Load an already existing model
+                        .to("weka:model?loadFrom=src/test/resources/data/sfny-j48.model")
+                        
+                        // Use a processor to do the prediction
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                Dataset dataset = exchange.getMessage().getBody(Dataset.class);
+                                dataset.applyToInstances(new NominalPredictor());
+                            }
+                        })
+                        
+                        // Write the data file
+                        .to("weka:write?path=src/test/resources/data/sfny-predicted.arff")
+                        
+                        .to("direct:end");
+                }
+            });
+            camelctx.start();
+            
+            ConsumerTemplate consumer = camelctx.createConsumerTemplate();
+            Dataset dataset = consumer.receiveBody("direct:end", Dataset.class);
+            
+            Instances wasdata = dataset.getInstances();
+            Instances expdata = dataset.pop("sfny-test").getInstances();
+            int numInstances = expdata.numInstances();
+            
+            int correct = numCorrectlyClassified(expdata, wasdata);
+            
+            double accuracy = 100.0 * correct / numInstances;
+            int incorrect = numInstances - correct;
+            
+            logInfo(String.format("Correctly Classified Instances   %d %.4f %%", correct, accuracy));
+            logInfo(String.format("Incorrectly Classified Instances %d %.4f %%", incorrect, 100 - accuracy));
+            
+            Assert.assertEquals("88.8889", String.format("%.4f", accuracy));
+        }
+    }
+
+    private int numCorrectlyClassified(Instances expdata, Instances wasdata) {
+        AssertArg.isEqual(expdata.classIndex(), wasdata.classIndex());
+        AssertArg.isEqual(expdata.size(), wasdata.size());
+        int numInstances = expdata.numInstances();
+        int clidx = expdata.classIndex();
+        int correct = 0;
+        for (int i = 0; i < numInstances; i++) {
+            double expval = expdata.instance(i).value(clidx);
+            double wasval = wasdata.instance(i).value(clidx);
+            correct += expval == wasval ? 1 : 0;
+        }
+        return correct;
+    }
+}
diff --git a/components/camel-weka/src/test/java/org/apache/camel/component/weka/ReadWriteTest.java b/components/camel-weka/src/test/java/org/apache/camel/component/weka/ReadWriteTest.java
index d98915c..8e6eefd 100644
--- a/components/camel-weka/src/test/java/org/apache/camel/component/weka/ReadWriteTest.java
+++ b/components/camel-weka/src/test/java/org/apache/camel/component/weka/ReadWriteTest.java
@@ -40,7 +40,8 @@ public class ReadWriteTest {
             camelctx.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    from("direct:start").to("weka:version");
+                    from("direct:start")
+                        .to("weka:version");
                 }
             });
             camelctx.start();
@@ -59,7 +60,9 @@ public class ReadWriteTest {
             camelctx.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    from("file:src/test/resources/data?fileName=sfny.csv&noop=true").to("weka:read").to("direct:end");
+                    from("file:src/test/resources/data?fileName=sfny.csv&noop=true")
+                        .to("weka:read")
+                        .to("direct:end");
                 }
             });
             camelctx.start();
@@ -78,7 +81,8 @@ public class ReadWriteTest {
             camelctx.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    from("direct:start").to("weka:read");
+                    from("direct:start")
+                        .to("weka:read");
                 }
             });
             camelctx.start();
@@ -100,7 +104,8 @@ public class ReadWriteTest {
             camelctx.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    from("direct:start").to("weka:read");
+                    from("direct:start")
+                        .to("weka:read");
                 }
             });
             camelctx.start();
@@ -122,7 +127,8 @@ public class ReadWriteTest {
             camelctx.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    from("direct:start").to("weka:read?path=src/test/resources/data/sfny.arff");
+                    from("direct:start")
+                        .to("weka:read?path=src/test/resources/data/sfny.arff");
                 }
             });
             camelctx.start();
@@ -141,7 +147,8 @@ public class ReadWriteTest {
             camelctx.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    from("direct:start").to("weka:read");
+                    from("direct:start")
+                        .to("weka:read");
                 }
             });
             camelctx.start();
@@ -163,7 +170,8 @@ public class ReadWriteTest {
             camelctx.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    from("direct:start").to("file:target/data?fileName=sfny.arff");
+                    from("direct:start")
+                        .to("file:target/data?fileName=sfny.arff");
                 }
             });
             camelctx.start();
@@ -188,7 +196,9 @@ public class ReadWriteTest {
             camelctx.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    from("direct:start").to("weka:write").to("file:target/data?fileName=sfny.arff");
+                    from("direct:start")
+                        .to("weka:write")
+                        .to("file:target/data?fileName=sfny.arff");
                 }
             });
             camelctx.start();
@@ -213,7 +223,8 @@ public class ReadWriteTest {
             camelctx.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    from("direct:start").to("weka:write?path=target/data/sfny.arff");
+                    from("direct:start")
+                        .to("weka:write?path=target/data/sfny.arff");
                 }
             });
             camelctx.start();
@@ -229,5 +240,4 @@ public class ReadWriteTest {
             Assert.assertNotNull(dataset);
         }
     }
-
 }
diff --git a/components/camel-weka/src/test/resources/data/sfny-j48.model b/components/camel-weka/src/test/resources/data/sfny-j48.model
new file mode 100644
index 0000000..48cf857
Binary files /dev/null and b/components/camel-weka/src/test/resources/data/sfny-j48.model differ
diff --git a/components/camel-weka/src/test/resources/data/sfny-predicted.arff b/components/camel-weka/src/test/resources/data/sfny-predicted.arff
new file mode 100644
index 0000000..b5ced04
--- /dev/null
+++ b/components/camel-weka/src/test/resources/data/sfny-predicted.arff
@@ -0,0 +1,111 @@
+@relation sfny-predicted
+
+@attribute beds numeric
+@attribute bath numeric
+@attribute price numeric
+@attribute year_built numeric
+@attribute sqft numeric
+@attribute price_per_sqft numeric
+@attribute elevation numeric
+@attribute predicted {0,1}
+
+@data
+2,1,999000,1960,1000,999,10,0
+0,1,439000,1930,500,878,10,0
+3,3,2095000,1926,2200,952,4,0
+2,2,2059500,2008,1373,1500,5,0
+2,2,3450000,1900,1850,1865,9,0
+1,1,1195000,1900,1093,1093,6,0
+2,2,3159990,2012,1420,2225,8,0
+2,2,6525000,2016,2018,3233,11,0
+1,1,1595000,2015,714,2234,7,0
+2,2,4995000,1900,2024,2468,8,0
+3,3,9900000,2015,2950,3356,4,0
+2,1,899000,1990,820,1096,7,1
+0,1,699999,1988,500,1400,9,0
+0,1,775000,2009,546,1419,10,0
+1,1,1150000,2014,645,1783,18,0
+2,2,799000,1928,850,940,19,0
+2,3,2999000,1925,3200,937,12,0
+2,2,1725000,2005,976,1767,10,0
+1,1,869000,1988,670,1297,16,1
+1,1,1145000,2005,700,1636,19,0
+2,2,3500000,1986,1463,2392,23,0
+0,1,299000,1930,400,748,12,1
+0,1,539000,1957,485,1111,10,0
+2,2,4285000,2016,1747,2453,10,0
+5,5,19000000,2016,4972,3821,10,0
+1,1,588000,1970,600,980,14,0
+2,2,4285000,2016,1747,2453,15,0
+4,4,13150000,2016,3338,3939,15,0
+1,1,1000000,1912,612,1634,18,0
+2,2,3600000,1960,1340,2687,24,0
+4,4,5500000,1923,3200,1719,21,0
+1,1,779000,1902,700,1113,10,0
+0,1,725000,1960,600,1208,26,0
+2,1,385000,1999,820,470,8,1
+1,1,600000,1899,1060,566,10,0
+2,2,1599000,1973,1400,1142,14,0
+1,1,499999,2011,669,747,35,1
+2,1,545000,1939,1049,520,39,1
+0,1,835000,2013,501,1667,8,0
+1,1,1550000,2004,794,1952,11,0
+2,2,1995000,1996,1044,1911,10,0
+1,1,1695000,1927,680,2493,17,0
+3,2,1950000,1956,1600,1219,10,0
+2,2,1995000,1986,1406,1419,14,0
+1,1,529000,1986,650,814,0,1
+2,2,849000,1982,1030,824,24,0
+1,2,1088000,1998,1086,1002,6,1
+1,1,699000,2008,756,925,12,1
+3,3,1245000,1907,1503,828,23,0
+2,3,4995000,2009,2230,2240,0,1
+1,1,949000,2010,824,1152,8,1
+2,2.5,1050000,2000,1640,640,2,1
+2,2,1395000,2001,1334,1046,4,1
+2,2,1600000,2002,1173,1364,7,0
+1,1,699000,1907,932,750,59,1
+2,2,5950000,1989,3700,1608,83,1
+2,2,958000,2005,915,1047,54,1
+1,1,699000,1984,880,794,73,1
+1,1.5,378551,2000,1000,379,9,1
+2,2,1095000,2010,1135,965,30,1
+3,2,1389000,1908,1546,898,55,1
+3,2.5,1300000,1975,1642,792,4,0
+3,2,699000,1964,1250,559,48,1
+4,3,900000,2003,1870,481,54,1
+2,2,649000,1924,1320,492,72,1
+3,2,535000,1967,1824,293,102,1
+3,2.5,879000,1981,2111,416,139,1
+1,1,697000,1922,694,1004,50,1
+1,1,825000,1955,600,1375,74,1
+3,1,1095000,1895,1465,747,68,1
+1,1,1199000,1906,1139,1053,91,1
+3,2,899000,1948,1665,540,52,1
+5,5,2990000,2014,5000,598,108,1
+6,6,6895000,1902,7800,884,67,1
+1,1,599000,1900,624,960,91,1
+3,2,1099000,1905,1457,754,67,1
+5,3.5,6495000,1906,4609,1409,89,1
+4,1,1050000,1932,1767,594,55,1
+4,2.5,995000,1915,2180,456,62,1
+2,1,749000,1936,1450,517,110,1
+2,2.5,2495000,1940,1809,1379,48,1
+3,2.5,929000,2015,1391,668,5,1
+4,2,689000,1951,1473,468,21,1
+2,1,995000,1908,600,1658,36,1
+3,2,1650000,1922,2025,815,106,1
+2,2,599000,1990,862,695,181,1
+4,4,3760000,1900,3085,1219,49,1
+3,2,1799000,1926,1800,999,66,1
+4,3.5,1995000,1992,3312,602,108,1
+3,2,1159000,1977,1731,670,143,1
+3,2.5,1095000,1968,1868,586,187,1
+3,2.5,1588000,2015,2001,794,43,1
+3,2,849900,1958,1310,649,118,1
+3,2.5,1611000,2015,2001,805,153,1
+2,1,699000,1949,1050,666,64,1
+2,2,1698000,2008,1620,1048,1,1
+1,1,788000,2004,903,873,4,1
+4,4,3760000,1894,3085,1219,49,1
+1,1,649000,1983,850,764,163,1
diff --git a/components/camel-weka/src/test/resources/data/sfny-test.arff b/components/camel-weka/src/test/resources/data/sfny-test.arff
new file mode 100644
index 0000000..b7ee347
--- /dev/null
+++ b/components/camel-weka/src/test/resources/data/sfny-test.arff
@@ -0,0 +1,111 @@
+@relation sfny-test
+
+@attribute beds numeric
+@attribute bath numeric
+@attribute price numeric
+@attribute year_built numeric
+@attribute sqft numeric
+@attribute price_per_sqft numeric
+@attribute elevation numeric
+@attribute in_sf {0,1}
+
+@data
+2,1,999000,1960,1000,999,10,0
+0,1,439000,1930,500,878,10,0
+3,3,2095000,1926,2200,952,4,0
+2,2,2059500,2008,1373,1500,5,0
+2,2,3450000,1900,1850,1865,9,0
+1,1,1195000,1900,1093,1093,6,0
+2,2,3159990,2012,1420,2225,8,0
+2,2,6525000,2016,2018,3233,11,0
+1,1,1595000,2015,714,2234,7,0
+2,2,4995000,1900,2024,2468,8,0
+3,3,9900000,2015,2950,3356,4,0
+2,1,899000,1990,820,1096,7,0
+0,1,699999,1988,500,1400,9,0
+0,1,775000,2009,546,1419,10,0
+1,1,1150000,2014,645,1783,18,0
+2,2,799000,1928,850,940,19,0
+2,3,2999000,1925,3200,937,12,0
+2,2,1725000,2005,976,1767,10,0
+1,1,869000,1988,670,1297,16,0
+1,1,1145000,2005,700,1636,19,0
+2,2,3500000,1986,1463,2392,23,0
+0,1,299000,1930,400,748,12,0
+0,1,539000,1957,485,1111,10,0
+2,2,4285000,2016,1747,2453,10,0
+5,5,19000000,2016,4972,3821,10,0
+1,1,588000,1970,600,980,14,0
+2,2,4285000,2016,1747,2453,15,0
+4,4,13150000,2016,3338,3939,15,0
+1,1,1000000,1912,612,1634,18,0
+2,2,3600000,1960,1340,2687,24,0
+4,4,5500000,1923,3200,1719,21,0
+1,1,779000,1902,700,1113,10,0
+0,1,725000,1960,600,1208,26,0
+2,1,385000,1999,820,470,8,0
+1,1,600000,1899,1060,566,10,0
+2,2,1599000,1973,1400,1142,14,0
+1,1,499999,2011,669,747,35,0
+2,1,545000,1939,1049,520,39,0
+0,1,835000,2013,501,1667,8,0
+1,1,1550000,2004,794,1952,11,0
+2,2,1995000,1996,1044,1911,10,0
+1,1,1695000,1927,680,2493,17,0
+3,2,1950000,1956,1600,1219,10,0
+2,2,1995000,1986,1406,1419,14,0
+1,1,529000,1986,650,814,0,0
+2,2,849000,1982,1030,824,24,1
+1,2,1088000,1998,1086,1002,6,1
+1,1,699000,2008,756,925,12,1
+3,3,1245000,1907,1503,828,23,1
+2,3,4995000,2009,2230,2240,0,1
+1,1,949000,2010,824,1152,8,1
+2,2.5,1050000,2000,1640,640,2,1
+2,2,1395000,2001,1334,1046,4,1
+2,2,1600000,2002,1173,1364,7,1
+1,1,699000,1907,932,750,59,1
+2,2,5950000,1989,3700,1608,83,1
+2,2,958000,2005,915,1047,54,1
+1,1,699000,1984,880,794,73,1
+1,1.5,378551,2000,1000,379,9,1
+2,2,1095000,2010,1135,965,30,1
+3,2,1389000,1908,1546,898,55,1
+3,2.5,1300000,1975,1642,792,4,1
+3,2,699000,1964,1250,559,48,1
+4,3,900000,2003,1870,481,54,1
+2,2,649000,1924,1320,492,72,1
+3,2,535000,1967,1824,293,102,1
+3,2.5,879000,1981,2111,416,139,1
+1,1,697000,1922,694,1004,50,1
+1,1,825000,1955,600,1375,74,1
+3,1,1095000,1895,1465,747,68,1
+1,1,1199000,1906,1139,1053,91,1
+3,2,899000,1948,1665,540,52,1
+5,5,2990000,2014,5000,598,108,1
+6,6,6895000,1902,7800,884,67,1
+1,1,599000,1900,624,960,91,1
+3,2,1099000,1905,1457,754,67,1
+5,3.5,6495000,1906,4609,1409,89,1
+4,1,1050000,1932,1767,594,55,1
+4,2.5,995000,1915,2180,456,62,1
+2,1,749000,1936,1450,517,110,1
+2,2.5,2495000,1940,1809,1379,48,1
+3,2.5,929000,2015,1391,668,5,1
+4,2,689000,1951,1473,468,21,1
+2,1,995000,1908,600,1658,36,1
+3,2,1650000,1922,2025,815,106,1
+2,2,599000,1990,862,695,181,1
+4,4,3760000,1900,3085,1219,49,1
+3,2,1799000,1926,1800,999,66,1
+4,3.5,1995000,1992,3312,602,108,1
+3,2,1159000,1977,1731,670,143,1
+3,2.5,1095000,1968,1868,586,187,1
+3,2.5,1588000,2015,2001,794,43,1
+3,2,849900,1958,1310,649,118,1
+3,2.5,1611000,2015,2001,805,153,1
+2,1,699000,1949,1050,666,64,1
+2,2,1698000,2008,1620,1048,1,1
+1,1,788000,2004,903,873,4,1
+4,4,3760000,1894,3085,1219,49,1
+1,1,649000,1983,850,764,163,1
diff --git a/components/camel-weka/src/test/resources/data/sfny-train.arff b/components/camel-weka/src/test/resources/data/sfny-train.arff
new file mode 100644
index 0000000..89a475c
--- /dev/null
+++ b/components/camel-weka/src/test/resources/data/sfny-train.arff
@@ -0,0 +1,405 @@
+@relation sfny-train
+
+@attribute beds numeric
+@attribute bath numeric
+@attribute price numeric
+@attribute year_built numeric
+@attribute sqft numeric
+@attribute price_per_sqft numeric
+@attribute elevation numeric
+@attribute in_sf {0,1}
+
+@data
+2,2,2750000,2006,1418,1939,0,0
+1,1,475000,1920,500,950,10,0
+1,1,999000,1982,784,1274,5,0
+2,2,2000000,1928,1200,1667,10,0
+1,1,3105000,2016,1108,2802,10,0
+1,1,450000,1920,500,900,10,0
+2,2,2200000,1920,2050,1073,10,0
+2,2,1675000,2006,1225,1367,12,0
+2,2,3995000,1906,2400,1665,9,0
+3,2,3580000,1880,3000,1193,10,0
+1,1,1849000,1920,1400,1321,5,0
+1,2,1950000,1900,1300,1500,7,0
+1,1,649000,1965,750,865,10,0
+1,1,649000,1965,750,865,10,0
+2,2,2650000,2014,1240,2137,18,0
+2,2,879000,1928,800,1099,22,0
+1,1,399000,1910,475,840,10,0
+2,2,2750000,2007,1384,1987,10,0
+1,1,1760000,1969,950,1853,17,0
+2,2,1725000,2005,976,1767,19,0
+0,1,925000,1978,585,1581,24,0
+1,1,695000,1961,720,965,16,0
+0,1,779000,1975,512,1521,10,0
+3,3,4875000,2016,2017,2417,10,0
+0,1,539000,1957,485,1111,11,0
+2,2,8690000,2002,2178,3990,15,0
+3,3,4875000,2016,2017,2417,15,0
+4,4,13400000,2016,3331,4023,15,0
+0,1,485000,1902,310,1565,23,0
+2,2,3600000,1960,1340,2687,24,0
+4,5,12000000,1939,3700,3243,22,0
+2,1,1475000,1973,971,1519,10,0
+2,1,925000,1941,790,1171,27,0
+2,2,1695000,2012,1051,1613,16,0
+2,1,910000,1899,1060,858,10,0
+2,2,1965000,2005,1330,1477,16,0
+1,1,749000,2011,762,983,35,0
+2,1,365000,1925,700,521,73,0
+1,1,935000,1910,1102,848,10,0
+1,1,1780000,2007,988,1802,14,0
+2,2,2235000,1999,1548,1444,14,0
+1,1,1495000,1962,1125,1329,19,0
+0,1,307000,1910,330,930,15,0
+2,2,2900000,1987,1600,1813,24,0
+3,3,2695000,2006,1991,1354,1,0
+3,2,1750000,1900,2950,593,26,1
+1,1,798000,1926,769,1038,10,1
+2,2,334905,2000,1047,320,12,1
+1,1,649000,1983,850,764,163,1
+2,2,1298000,2008,1159,1120,0,1
+1,1,187518,2000,670,280,12,1
+2,2,895000,2006,1113,804,3,1
+2,2,1299000,2004,1453,894,5,1
+3,2,1275000,2001,1502,849,13,1
+1,1,985000,1978,884,1114,11,1
+4,4,1800000,1948,2475,727,10,1
+2,2,1200000,1909,1302,922,56,1
+3,2,1295000,1926,1675,773,77,1
+4,2.3,1398000,1904,2492,561,13,1
+1,2,795000,2014,616,1291,31,1
+4,2,1099000,1962,1267,867,69,1
+1,1,539000,2000,709,760,5,1
+2,1,750000,1924,980,765,48,1
+3,2,748000,1930,1522,491,54,1
+2,1,798888,1929,1025,779,74,1
+2,1,699000,1913,900,777,108,1
+4,2.5,699900,1982,1752,399,140,1
+5,3.5,3995000,1905,3350,1193,59,1
+2,1,699000,1907,915,764,89,1
+1,1,725000,1900,811,894,70,1
+3,2.5,799000,1947,1400,571,35,1
+3,2,899000,1948,1665,540,52,1
+4,3,1338800,1940,2330,575,119,1
+2,2,1725000,1922,1415,1219,86,1
+1,1,599000,1900,624,960,91,1
+5,5.5,3495000,1921,4310,811,73,1
+4,3,1698000,1890,1789,949,102,1
+1,1,739900,1941,875,846,70,1
+2,1,829000,1925,1145,724,71,1
+6,6.5,9500000,1937,5420,1753,3,1
+2,2,2000000,1925,1518,1318,50,1
+4,4,850000,1928,2470,344,8,1
+2,2,669000,1986,1317,508,23,1
+2,1,759900,1941,1175,647,43,1
+3,3.5,2250000,1928,3258,691,127,1
+5,3.5,2995000,1947,3890,770,181,1
+3,2,1050000,1922,1266,829,52,1
+2,1,600000,1908,1350,444,92,1
+3,2,1495000,1937,1635,914,112,1
+3,2.5,995000,1976,1959,508,163,1
+2,1,599000,1972,990,605,189,1
+2,1,795000,1941,1256,633,63,1
+2,1,599000,1941,1254,478,123,1
+2,2,1495000,1913,1174,1273,35,1
+3,3,888000,1975,1555,571,79,1
+1,1,849000,2012,886,958,2,1
+2,2,1950000,1995,1930,1010,4,1
+3,2,1799000,1926,1800,999,66,1
+3,2,995000,1956,1305,762,216,1
+2,2,1350000,1900,2150,628,9,0
+1,1,975000,1930,900,1083,10,0
+1,1,999000,1982,784,1274,5,0
+1,1,715000,1903,557,1284,3,0
+4,5,13750000,2016,3699,3717,10,0
+1,1,1195000,1900,1093,1093,10,0
+2,2,4895000,2016,1520,3220,10,0
+2,2,1675000,2006,1225,1367,12,0
+1,1,1285000,2012,749,1716,10,0
+3,3,6350000,2015,2500,2540,3,0
+2,2,3500000,1915,2000,1750,5,0
+1,1,1750000,1963,1000,1750,5,0
+2,2,1200000,1964,1200,1000,10,0
+2,2,1849000,2009,1135,1629,10,0
+0,1,469000,1932,500,938,18,0
+2,3,2999000,1925,3200,937,10,0
+1,1,1135000,2005,715,1587,10,0
+2,2,2750000,2007,1384,1987,13,0
+3,3,1249000,1962,1500,833,18,0
+2,2,3500000,1925,1550,2258,20,0
+2,2,1700000,1982,1007,1688,25,0
+1,1,695000,1961,720,965,16,0
+1,1,925000,1985,745,1242,10,0
+3,3,14950000,1931,4435,3371,10,0
+1,1,835000,1963,700,1193,12,0
+2,2,4195000,2016,1750,2397,15,0
+3,3,5750000,2016,2196,2618,15,0
+5,5,19000000,2016,4972,3821,15,0
+3,2,7350000,1999,2075,3542,23,0
+1,1,525000,1924,700,750,24,0
+1,2,1850000,2007,839,2205,23,0
+2,2,1385000,1971,962,1440,10,0
+1,1,965000,1961,787,1226,27,0
+1,1,600000,1899,1060,566,8,0
+2,2,1599000,1973,1400,1142,10,0
+1,1,735000,1928,800,919,22,0
+2,2,904000,1920,1503,601,10,0
+2,1,365000,1925,700,521,73,0
+1,1,1420000,2004,768,1849,10,0
+2,2,2800000,2007,1308,2141,14,0
+4,4,8800000,1941,3382,2602,15,0
+2,2,2200000,2000,1044,2107,2,0
+2,2,1735000,1980,1585,1095,18,0
+2,2,2499000,2004,1658,1507,24,0
+3,3,2695000,2006,1991,1354,1,0
+1,1,799000,2008,847,943,29,1
+1,1,798000,1926,769,1038,10,1
+1,1.5,849000,1996,1127,753,13,1
+2,2,1700000,1987,1250,1360,11,1
+2,2,2149000,2008,1317,1632,0,1
+3,2.5,1995000,2008,2354,847,23,1
+1,1,998000,2006,872,1144,3,1
+2,2.5,850000,2000,1136,748,5,1
+1,1.5,775000,2009,835,928,14,1
+1,1,725000,1978,1063,682,60,1
+2,2.5,2350000,2008,1314,1788,19,1
+3,3,1575000,1993,2233,705,62,1
+4,4,1800000,1948,2347,767,87,1
+2,1.3,1097000,1904,1493,735,14,1
+3,2,979000,1900,1440,680,33,1
+3,3.5,1799000,1900,2449,735,75,1
+1,1,735000,1983,779,944,12,1
+3,2,549000,1915,1972,278,51,1
+2,1.5,749000,1949,1348,556,56,1
+5,2,899000,1972,1940,463,81,1
+2,1,699000,1913,900,777,108,1
+2,1,688000,1950,1145,601,143,1
+1,1,650000,1955,600,1083,74,1
+3,2,1495000,1927,1520,984,105,1
+1,1,725000,1900,811,894,70,1
+3,2,795000,1954,1350,589,36,1
+6,4,1198000,1937,1965,610,79,1
+3,2.5,1388000,1928,1905,729,160,1
+3,3,1995000,1922,1915,1042,88,1
+3,2,1200000,1929,1284,935,121,1
+3,2,2395000,1929,2323,1031,73,1
+5,2,6298000,1914,3585,1757,26,1
+4,4,1595000,1925,2750,580,81,1
+2,1,875000,1908,1158,756,84,1
+4,3,3595000,1931,3017,1192,9,1
+4,3.5,9895000,2008,6024,1643,62,1
+2,1,648000,1921,1125,576,11,1
+2,1.5,729000,1942,1012,720,27,1
+6,3.5,995000,2001,3080,323,55,1
+3,1,1319000,1925,1752,753,141,1
+3,2,995000,1956,1305,762,216,1
+2,2,1895000,1907,1756,1079,54,1
+2,1,1495000,1908,1700,879,98,1
+3,3.5,2195000,1922,2168,1012,125,1
+4,3,1388000,1968,2275,610,163,1
+2,1,915000,1954,1251,731,24,1
+2,1,795000,1941,1256,633,63,1
+3,2.5,1539514,2014,2024,761,136,1
+1,1,699000,1908,750,932,36,1
+1,1,599000,1945,631,949,84,1
+2,2,1675000,2012,1562,1072,2,1
+0,1,539000,2000,709,760,5,1
+5,2.5,1800000,1890,3073,586,76,1
+1,1,629000,1903,500,1258,9,0
+1,1,975000,1930,900,1083,12,0
+1,1,1249000,1987,826,1512,3,0
+2,2,2498000,2005,1260,1983,2,0
+2,2,1185000,1900,1000,1185,4,0
+2,2,1185000,1900,1000,1185,10,0
+2,2,6525000,2016,2018,3233,10,0
+2,1,999000,1985,799,1250,5,0
+1,1,1595000,2015,714,2234,10,0
+3,3,6550000,2015,2500,2620,3,0
+2,2,3500000,1910,1887,1855,5,0
+0,1,775000,2009,546,1419,6,0
+0,1,319000,1941,500,638,10,0
+1,1,615000,1960,750,820,11,0
+0,1,569000,1962,543,1048,8,0
+2,3,2999000,1925,3200,937,10,0
+1,1,1145000,2005,700,1636,10,0
+1,1,399000,1910,475,840,14,0
+3,3,1500000,1962,1600,938,18,0
+0,1,525000,1940,525,1000,21,0
+2,2,1700000,1982,1007,1688,25,0
+4,5,17750000,2012,4476,3966,20,0
+1,1,1319000,1958,1000,1319,10,0
+3,3,10225000,2016,3007,3400,10,0
+10,10,7995000,1910,6400,1249,12,0
+2,2,4195000,2016,1742,2408,15,0
+3,3,9350000,2016,3054,3062,15,0
+0,1,850000,1924,546,1557,10,0
+1,1,1200000,1969,600,2000,23,0
+0,1,545000,1900,387,1408,10,0
+1,1,535000,1900,450,1189,27,0
+1,1,779000,1902,700,1113,13,0
+2,1,1250000,1922,1145,1092,30,0
+2,1,910000,1899,1060,858,8,0
+2,2,4625000,1987,1695,2729,10,0
+2,2,4625000,1987,1695,2729,29,0
+2,2,904000,1920,1503,601,10,0
+1,1,935000,1910,1102,848,8,0
+1,1,1550000,2004,794,1952,10,0
+1,1,411500,1921,586,702,6,0
+2,2,1850000,1996,1044,1772,17,0
+0.5,1,384900,1962,540,713,10,0
+3,3,1850000,2005,1353,1367,8,0
+2,2,1575000,1930,1324,1190,33,0
+4,3,3895000,2001,2277,1711,0,0
+1,1.5,899000,1997,1453,619,3,1
+1,1,1495000,1927,2275,657,10,1
+1,1.5,1365000,1996,1607,849,13,1
+2,2,1980000,2009,1469,1348,0,1
+3,2,1995000,2006,1362,1465,3,1
+3,2.5,1995000,2008,2354,847,23,1
+2,2,1659000,2000,1165,1424,4,1
+3,2.5,2850000,2013,2075,1373,5,1
+3,2,1399000,1892,1809,773,41,1
+2,2,849000,1911,1100,772,66,1
+1,1,740200,1920,989,748,41,1
+2,2,1199000,1964,1100,1090,68,1
+1,1,795000,2000,990,803,7,1
+1,1,670000,2014,507,1321,20,1
+3,3,1488888,1977,2100,709,38,1
+3,2,1250000,1924,1450,862,83,1
+3,2,688000,1942,1441,477,34,1
+6,3,1600000,1926,2567,623,52,1
+2,1,599000,1942,707,847,61,1
+3,1.5,699000,1910,1625,430,94,1
+2,1,500000,1958,1166,429,136,1
+3,2,1195000,1907,1396,856,33,1
+1,1,650000,1955,600,1083,74,1
+4,4.5,5200000,1952,4813,1080,238,1
+4,4,2650000,1900,3816,694,75,1
+2,1,699000,1944,995,703,41,1
+3,3,1100000,1925,2633,418,91,1
+3,2,989000,1940,1603,617,227,1
+0,1,499000,1900,510,978,91,1
+3,2,1395000,1909,1877,743,57,1
+4,2.5,2549000,1907,2746,928,75,1
+3,3,1139000,2008,1532,743,44,1
+2,1,699000,1907,1200,583,14,1
+5,3,950000,1939,1846,515,97,1
+2,1.5,1425000,1925,1360,1048,14,1
+3,2,358000,1989,1325,270,5,1
+2,1,480000,1915,680,706,13,1
+2,2,767000,1916,1380,556,31,1
+2,1,725000,1945,1040,697,60,1
+5,3.5,1698000,1966,2769,613,176,1
+1,1,350000,1908,600,583,43,1
+1,1,599000,1961,680,881,56,1
+3,2,1595000,1961,1515,1053,103,1
+4,2,1798000,1951,2050,877,131,1
+5,3.5,2250000,1962,3729,603,174,1
+2,1,915000,1954,1251,731,24,1
+4,2,848000,1949,1646,515,69,1
+3,2.5,1339000,2015,2133,628,143,1
+2,2.5,3495000,1900,1968,1776,76,1
+3,3,758000,1989,2157,351,90,1
+2,2,1695000,2007,1610,1053,2,1
+2,2,849000,1982,1030,824,24,1
+2,1,695000,1923,1045,665,106,1
+0,1,439000,1930,500,878,10,0
+2,1,1895000,1921,1000,1895,12,0
+0,1,1110000,2008,698,1590,5,0
+2,1.5,2650000,1915,2500,1060,5,0
+1,2,1699000,1900,1500,1133,5,0
+0,1,625000,1964,800,781,8,0
+2,2,4895000,2016,1520,3220,11,0
+1,1,1550000,1926,1000,1550,6,0
+2,2,3995000,1906,2400,1665,10,0
+3,3,5985000,1909,3300,1814,3,0
+1,1,1250000,2007,720,1736,6,0
+0,1,390000,1955,550,709,6,0
+0,1,699999,1988,500,1400,10,0
+1,1,1590000,2008,785,2025,12,0
+1,1,549000,1924,750,732,10,0
+2,3,2999000,1925,3200,937,12,0
+1,1,1760000,1969,950,1853,10,0
+2,2,1725000,2005,1144,1508,16,0
+1,1,1135000,2005,715,1587,19,0
+2,2,2025000,1940,1433,1413,21,0
+0,1,449000,1962,550,816,10,0
+5,5,27500000,1930,7500,3667,21,0
+2,2,4240000,2016,1741,2435,10,0
+4,4,13400000,2016,3331,4023,10,0
+0,1,349000,1960,400,873,13,0
+2,2,4240000,2016,1741,2435,15,0
+3,3,10225000,2016,3007,3400,15,0
+7,7,19500000,1994,4238,4601,10,0
+2,2,4250000,2011,1504,2826,23,0
+1,1,535000,1900,450,1189,10,0
+2,2,1350000,1931,1300,1038,36,0
+1,1,649000,1929,800,811,25,0
+1,1,650000,1907,720,903,32,0
+8,7,2300000,1910,4180,550,9,0
+0,1,325000,1910,375,867,11,0
+1,1,749000,2011,762,983,10,0
+2,1,559900,1925,1200,467,51,0
+0,1,820000,2013,533,1538,8,0
+2,2,1635000,2004,957,1708,10,0
+2,2,2175000,1999,1569,1386,10,0
+2,2,1995000,1996,1044,1911,17,0
+1,1,515000,1962,725,710,10,0
+3,3,1850000,2005,1353,1367,10,0
+3,3,1495000,1990,1360,1099,0,0
+1,1,550000,1982,724,760,24,1
+1,1,598000,2005,534,1120,5,1
+4,4,4300000,2006,3321,1295,10,1
+1,1,649000,2011,674,963,14,1
+2,2,3600000,2009,1652,2179,0,1
+2,2,1650000,1937,1640,1006,7,1
+2,2,2799000,2008,1328,2108,35,1
+1,1,788000,2004,903,873,4,1
+2,2,950000,2000,1258,755,6,1
+2,2,879000,1912,950,925,53,1
+1,1,618000,1973,705,877,72,1
+1,1,699000,1982,780,896,51,1
+1,1,529000,1966,791,669,69,1
+2,2,779000,2007,808,964,7,1
+2,2,1199000,2014,943,1271,20,1
+3,2,1389000,1908,1546,898,55,1
+3,2,998000,1907,1464,682,84,1
+2,1,859000,1890,1100,781,44,1
+2,2,875000,1909,1700,515,54,1
+3,1,648800,1940,1325,490,66,1
+3,2,749000,1907,1513,495,95,1
+3,2.5,879000,1981,2111,416,139,1
+3,2,1195000,1907,1396,856,33,1
+1,1,775000,1955,600,1292,74,1
+2,2,989000,1984,988,1001,46,1
+2,2,1680000,1962,1850,908,89,1
+3,2,848000,1940,1500,565,42,1
+6,3.5,949000,1918,2473,384,97,1
+3,1,1295000,1890,1772,731,65,1
+0,1,499000,1900,510,978,91,1
+3,3,1785000,1925,1970,906,58,1
+3,2,995000,2000,1393,714,75,1
+3,2,1080000,1914,1954,553,49,1
+2,1,799000,1938,1150,695,36,1
+2,1,749000,1936,1450,517,110,1
+1,1,865000,1993,960,901,17,1
+3,2.5,899000,2015,1391,646,5,1
+1,1,499000,1900,1076,464,19,1
+3,1,660000,1900,1520,434,35,1
+4,3,3420000,1926,5113,669,98,1
+3,2,1049000,1947,1626,645,179,1
+2,1,550000,1908,800,688,43,1
+4,3,1895000,2001,2041,928,61,1
+3,2,849000,1947,1622,523,106,1
+2,2,849000,1978,1555,546,139,1
+3,2,1080000,1989,1524,709,185,1
+3,2,725000,1975,1474,492,34,1
+1,1,439000,2002,667,658,80,1
+3,2.5,1294000,2015,2133,607,143,1
+4,2,699000,1949,1550,451,11,1
+2,2,1698000,2008,1620,1048,1,1
+3,2,2219000,2012,1921,1155,13,1
+2,2.5,2495000,1940,1809,1379,48,1
+3,2,1650000,1922,1483,1113,106,1
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/WekaEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/WekaEndpointBuilderFactory.java
index 75d53a8..8dfffe6 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/WekaEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/WekaEndpointBuilderFactory.java
@@ -38,17 +38,6 @@ public interface WekaEndpointBuilderFactory {
             return (AdvancedWekaEndpointBuilder) this;
         }
         /**
-         * The Weka filter/classifier spec (i.e. Name Options).
-         * 
-         * The option is a: <code>java.lang.String</code> type.
-         * 
-         * Group: producer
-         */
-        default WekaEndpointBuilder apply(String apply) {
-            doSetProperty("apply", apply);
-            return this;
-        }
-        /**
          * Whether the producer should be started lazy (on the first message).
          * By starting lazy you can use this to allow CamelContext and routes to
          * startup in situations where a producer may otherwise fail during
@@ -89,7 +78,7 @@ public interface WekaEndpointBuilderFactory {
             return this;
         }
         /**
-         * An optional in/out path for the read/write commands.
+         * An in/out path for the read/write commands.
          * 
          * The option is a: <code>java.lang.String</code> type.
          * 
@@ -99,6 +88,133 @@ public interface WekaEndpointBuilderFactory {
             doSetProperty("path", path);
             return this;
         }
+        /**
+         * The filter spec (i.e. Name Options).
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: filter
+         */
+        default WekaEndpointBuilder apply(String apply) {
+            doSetProperty("apply", apply);
+            return this;
+        }
+        /**
+         * The classifier spec (i.e. Name Options).
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: model
+         */
+        default WekaEndpointBuilder build(String build) {
+            doSetProperty("build", build);
+            return this;
+        }
+        /**
+         * The named dataset to train the classifier with.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: model
+         */
+        default WekaEndpointBuilder dsname(String dsname) {
+            doSetProperty("dsname", dsname);
+            return this;
+        }
+        /**
+         * Numer of folds to use for cross-validation.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 10
+         * Group: model
+         */
+        default WekaEndpointBuilder folds(int folds) {
+            doSetProperty("folds", folds);
+            return this;
+        }
+        /**
+         * Numer of folds to use for cross-validation.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Default: 10
+         * Group: model
+         */
+        default WekaEndpointBuilder folds(String folds) {
+            doSetProperty("folds", folds);
+            return this;
+        }
+        /**
+         * Path to load the model from.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: model
+         */
+        default WekaEndpointBuilder loadFrom(String loadFrom) {
+            doSetProperty("loadFrom", loadFrom);
+            return this;
+        }
+        /**
+         * Path to save the model to.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: model
+         */
+        default WekaEndpointBuilder saveTo(String saveTo) {
+            doSetProperty("saveTo", saveTo);
+            return this;
+        }
+        /**
+         * An optional seed for the randomizer.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 1
+         * Group: model
+         */
+        default WekaEndpointBuilder seed(int seed) {
+            doSetProperty("seed", seed);
+            return this;
+        }
+        /**
+         * An optional seed for the randomizer.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Default: 1
+         * Group: model
+         */
+        default WekaEndpointBuilder seed(String seed) {
+            doSetProperty("seed", seed);
+            return this;
+        }
+        /**
+         * Flag on whether to use cross-validation with the current dataset.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: model
+         */
+        default WekaEndpointBuilder xval(boolean xval) {
+            doSetProperty("xval", xval);
+            return this;
+        }
+        /**
+         * Flag on whether to use cross-validation with the current dataset.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: model
+         */
+        default WekaEndpointBuilder xval(String xval) {
+            doSetProperty("xval", xval);
+            return this;
+        }
     }
 
     /**
@@ -176,11 +292,26 @@ public interface WekaEndpointBuilderFactory {
          * Since: 3.1
          * Maven coordinates: org.apache.camel:camel-weka
          * 
-         * Syntax: <code>weka:cmd</code>
+         * Syntax: <code>weka:cmd?options</code>
+         * 
+         * Path parameter: read
+         * The read command
+         * 
+         * Path parameter: write
+         * The write command
          * 
          * Path parameter: filter
          * The filter command
          * 
+         * Path parameter: model
+         * The model command
+         * 
+         * Path parameter: push
+         * The push command
+         * 
+         * Path parameter: pop
+         * The pop command
+         * 
          * Path parameter: version
          * The version command
          */
@@ -196,11 +327,26 @@ public interface WekaEndpointBuilderFactory {
      * Since: 3.1
      * Maven coordinates: org.apache.camel:camel-weka
      * 
-     * Syntax: <code>weka:cmd</code>
+     * Syntax: <code>weka:cmd?options</code>
+     * 
+     * Path parameter: read
+     * The read command
+     * 
+     * Path parameter: write
+     * The write command
      * 
      * Path parameter: filter
      * The filter command
      * 
+     * Path parameter: model
+     * The model command
+     * 
+     * Path parameter: push
+     * The push command
+     * 
+     * Path parameter: pop
+     * The pop command
+     * 
      * Path parameter: version
      * The version command
      */
diff --git a/docs/components/modules/ROOT/pages/weka-component.adoc b/docs/components/modules/ROOT/pages/weka-component.adoc
index f18c234..87d574f 100644
--- a/docs/components/modules/ROOT/pages/weka-component.adoc
+++ b/docs/components/modules/ROOT/pages/weka-component.adoc
@@ -51,33 +51,45 @@ The Weka component supports 2 options, which are listed below.
 The Weka endpoint is configured using URI syntax:
 
 ----
-weka:cmd
+weka:cmd?options
 ----
 
 with the following path and query parameters:
 
-=== Path Parameters (2 parameters):
+=== Path Parameters (7 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *read* | The read command |  | String
+| *write* | The write command |  | String
 | *filter* | The filter command |  | String
+| *model* | The model command |  | String
+| *push* | The push command |  | String
+| *pop* | The pop command |  | String
 | *version* | The version command |  | String
 |===
 
 
-=== Query Parameters (5 parameters):
+=== Query Parameters (12 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *apply* (producer) | The Weka filter/classifier spec (i.e. Name Options) |  | String
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
-| *path* (producer) | An optional in/out path for the read/write commands |  | String
+| *path* (producer) | An in/out path for the read/write commands |  | String
 | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *apply* (filter) | The filter spec (i.e. Name Options) |  | String
+| *build* (model) | The classifier spec (i.e. Name Options) |  | String
+| *dsname* (model) | The named dataset to train the classifier with |  | String
+| *folds* (model) | Numer of folds to use for cross-validation | 10 | int
+| *loadFrom* (model) | Path to load the model from |  | String
+| *saveTo* (model) | Path to save the model to |  | String
+| *seed* (model) | An optional seed for the randomizer | 1 | int
+| *xval* (model) | Flag on whether to use cross-validation with the current dataset | false | boolean
 |===
 // endpoint options: END
 // spring-boot-auto-configure options: START
@@ -119,9 +131,10 @@ This component is not supported in Karaf
 
 == Samples
 
-This first example shows how to read a CSV file with the file component and then 
-pass it on to Weka. In Weka we apply a few filters to the data set and then pass it on to
-the file component for writing. 
+=== Read + Filter + Write
+
+This first example shows how to read a CSV file with the file component and then pass it on to Weka. 
+In Weka we apply a few filters to the data set and then pass it on to the file component for writing. 
 
 [source,java]
 ----
@@ -129,7 +142,7 @@ the file component for writing.
     public void configure() throws Exception {
         
         // Use the file component to read the CSV file
-        from("file:src/test/resources/data?fileName=sfny.csv&noop=true")
+        from("file:src/test/resources/data?fileName=sfny.csv")
         
         // Convert the 'in_sf' attribute to nominal
         .to("weka:filter?apply=NumericToNominal -R first")
@@ -142,22 +155,19 @@ the file component for writing.
         
         // Use the file component to write the Arff file
         .to("file:target/data?fileName=sfny.arff")
-        
-        .to("direct:end");
     }
 ----
 
 The second example does the same as above without use of the file component.
+For supported input types see `WekaTypeConverters`.
 
 [source,java]
 ----
     @Override
     public void configure() throws Exception {
         
-        from("direct:start")
-        
         // Use Weka to read the CSV file
-        .to("weka:read?path=src/test/resources/data/sfny.csv")
+        .from("direct:start")
         
         // Convert the 'in_sf' attribute to nominal
         .to("weka:filter?apply=NumericToNominal -R first")
@@ -172,3 +182,89 @@ The second example does the same as above without use of the file component.
         .to("weka:write?path=target/data/sfny.arff");
     }
 ----
+
+=== Building a Model
+
+When building a model, we first choose the classification algorithm to use and then train it with some data. The result is the trained model that we can later use to classify unseen data.
+
+Here we train J48 with 10 fold cross-validation.  
+
+[source,java]
+----
+try (CamelContext camelctx = new DefaultCamelContext()) {
+    
+    camelctx.addRoutes(new RouteBuilder() {
+        
+        @Override
+        public void configure() throws Exception {
+            
+            // Use weka to read the model training data
+            from("file:src/test/resources/data?fileName=sfny-train.arff")
+            
+            // Build a J48 classifier using cross-validation with 10 folds
+            .to("weka:model?build=J48&xval=true&folds=10&seed=1")
+                    
+            // Persist the J48 model
+            .to("weka:model?saveTo=src/test/resources/data/sfny-j48.model")
+        }
+    });
+    camelctx.start();
+}
+----
+
+=== Predicting a Class
+
+Here we use a `Processor` to access functionality that is not directly available from endpoint URIs.
+
+In case you come here directly and this syntax looks a bit overwhelming, you might want to have a brief look at the section about https://tdiesler.github.io/nessus-weka/#_nessus_api_concepts[Nessus API Concepts].  
+
+[source,java]
+----
+try (CamelContext camelctx = new DefaultCamelContext()) {
+    
+    camelctx.addRoutes(new RouteBuilder() {
+        
+        @Override
+        public void configure() throws Exception {
+            
+            // Use weka to read the data file
+            from("file:src/test/resources/data?fileName=sfny-test.arff")
+            
+            // Remove the class attribute 
+            .to("weka:filter?apply=Remove -R last")
+            
+            // Add the 'prediction' placeholder attribute 
+            .to("weka:filter?apply=Add -N predicted -T NOM -L 0,1")
+            
+            // Rename the relation 
+            .to("weka:filter?apply=RenameRelation -modify sfny-predicted")
+            
+            // Load an already existing model
+            .to("weka:model?loadFrom=src/test/resources/data/sfny-j48.model")
+            
+            // Use a processor to do the prediction
+            .process(new Processor() {
+                public void process(Exchange exchange) throws Exception {
+                    Dataset dataset = exchange.getMessage().getBody(Dataset.class);
+                    dataset.applyToInstances(new NominalPredictor());
+                }
+            })
+                    
+            // Write the data file
+            .to("weka:write?path=src/test/resources/data/sfny-predicted.arff")
+        }
+    });
+    camelctx.start();
+}
+----
+
+== Resources
+
+* https://www.cs.waikato.ac.nz/ml/weka/book.html[Practical Machine Learning Tools and Techniques,window=_blank]
+* https://www.cs.waikato.ac.nz/ml/weka/courses.html[Machine Learning Courses,window=_blank]
+* https://waikato.github.io/weka-wiki/documentation/[Weka Documentation,window=_blank]
+* https://tdiesler.github.io/nessus-weka[Nessus-Weka,window=_blank]
+
+
+
+
diff --git a/parent/pom.xml b/parent/pom.xml
index d377a32..f713097 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -476,7 +476,7 @@
         <mybatis-version>3.5.3</mybatis-version>
         <narayana-version>5.10.4.Final</narayana-version>
         <nessus-ipfs-version>1.0.0.Beta4</nessus-ipfs-version>
-        <nessus-weka-version>1.0.0</nessus-weka-version>
+        <nessus-weka-version>1.0.1</nessus-weka-version>
         <nsq-client-version>1.0.0.RC4</nsq-client-version>
         <neoscada-version>0.4.0</neoscada-version>
         <netty3-version>3.10.6.Final</netty3-version>