You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:31 UTC

[14/50] [abbrv] incubator-apex-malhar git commit: Removed ActivationListener interface from Parsers Added XSD validation to XML parser. Changed XML parser to use JAXB instead of Xstream to support XSD validation. Ignore test resource XSD file in Rat chec

Removed ActivationListener interface from Parsers
Added XSD validation to XML parser. Changed XML parser to use JAXB instead of Xstream to support XSD validation.
Ignore test resource XSD file in Rat check
Added serialization tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7a1d0871
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7a1d0871
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7a1d0871

Branch: refs/heads/master
Commit: 7a1d08718a4e689503948c0d4a79065bee1910d1
Parents: 23dabc4
Author: ishark <is...@datatorrent.com>
Authored: Tue Dec 15 16:03:09 2015 -0800
Committer: ishark <is...@datatorrent.com>
Committed: Wed Dec 23 16:48:39 2015 -0800

----------------------------------------------------------------------
 .../contrib/formatter/CsvFormatter.java         |  12 --
 .../datatorrent/contrib/parser/CsvParser.java   |  21 +--
 library/pom.xml                                 |   6 -
 .../datatorrent/lib/formatter/Formatter.java    |   4 +-
 .../lib/formatter/JsonFormatter.java            |  11 +-
 .../datatorrent/lib/formatter/XmlFormatter.java |  73 ++++----
 .../com/datatorrent/lib/parser/JsonParser.java  |  18 +-
 .../java/com/datatorrent/lib/parser/Parser.java |  55 ++++--
 .../com/datatorrent/lib/parser/XmlParser.java   | 173 +++++++++++--------
 .../lib/util/ReusableStringReader.java          |   4 +
 .../lib/formatter/JsonFormatterTest.java        |   2 -
 .../lib/formatter/XmlFormatterTest.java         |  72 ++++++--
 .../datatorrent/lib/parser/JsonParserTest.java  |   2 -
 .../datatorrent/lib/parser/XmlParserTest.java   | 150 ++++++++++++++--
 library/src/test/resources/employeeBean.xsd     |  23 +++
 pom.xml                                         |   2 +
 16 files changed, 412 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
index e5bbd3c..78a7c78 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
@@ -129,18 +129,6 @@ public class CsvFormatter extends Formatter<String>
   }
 
   @Override
-  public void activate(Context context)
-  {
-
-  }
-
-  @Override
-  public void deactivate()
-  {
-
-  }
-
-  @Override
   public String convert(Object tuple)
   {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
index 0c5f8d2..ada368c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
@@ -62,7 +62,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  * @since 3.2.0
  */
 @InterfaceStability.Evolving
-public class CsvParser extends Parser<String>
+public class CsvParser extends Parser<String, String>
 {
 
   private ArrayList<Field> fields;
@@ -148,19 +148,6 @@ public class CsvParser extends Parser<String>
       }
     }
   }
-
-  @Override
-  public void activate(Context context)
-  {
-
-  }
-
-  @Override
-  public void deactivate()
-  {
-
-  }
-
   @Override
   public Object convert(String tuple)
   {
@@ -185,6 +172,12 @@ public class CsvParser extends Parser<String>
     }
   }
 
+  @Override
+  public String processErorrTuple(String input)
+  {
+    return input;
+  }
+
   public static class Field
   {
     String name;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index cbc0fb7..a3d987e 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -300,12 +300,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <!-- required by Xml parser and formatter -->
-      <groupId>com.thoughtworks.xstream</groupId>
-      <artifactId>xstream</artifactId>
-      <version>1.4.8</version>
-    </dependency>
-    <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-websocket</artifactId>
       <version>${jetty.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
index 0de9070..b7101fb 100644
--- a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
+++ b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
@@ -24,7 +24,6 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
@@ -47,8 +46,7 @@ import com.datatorrent.lib.converter.Converter;
  * @since 3.2.0
  */
 @InterfaceStability.Evolving
-public abstract class Formatter<OUTPUT> extends BaseOperator implements Converter<Object, OUTPUT>,
-    ActivationListener<Context>
+public abstract class Formatter<OUTPUT> extends BaseOperator implements Converter<Object, OUTPUT>
 {
   protected transient Class<?> clazz;
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
index 627bf95..4ed24b4 100644
--- a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
+++ b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
@@ -28,10 +28,9 @@ import org.codehaus.jackson.map.ObjectWriter;
 import org.codehaus.jackson.map.SerializationConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.classification.InterfaceStability;
 
-import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -51,7 +50,7 @@ public class JsonFormatter extends Formatter<String>
   protected String dateFormat;
 
   @Override
-  public void activate(Context context)
+  public void setup(OperatorContext context)
   {
     try {
       ObjectMapper mapper = new ObjectMapper();
@@ -68,12 +67,6 @@ public class JsonFormatter extends Formatter<String>
   }
 
   @Override
-  public void deactivate()
-  {
-
-  }
-
-  @Override
   public String convert(Object tuple)
   {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
index 35ee7b7..9ad2612 100644
--- a/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
+++ b/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
@@ -18,21 +18,20 @@
  */
 package com.datatorrent.lib.formatter;
 
-import java.io.Writer;
+import java.io.StringWriter;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.namespace.QName;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.classification.InterfaceStability;
 
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.XStreamException;
-import com.thoughtworks.xstream.converters.basic.DateConverter;
-import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
-import com.thoughtworks.xstream.io.xml.CompactWriter;
-import com.thoughtworks.xstream.io.xml.XppDriver;
-
-import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * @displayName XmlParser
@@ -43,13 +42,12 @@ import com.datatorrent.api.Context;
 @InterfaceStability.Evolving
 public class XmlFormatter extends Formatter<String>
 {
-
-  private transient XStream xstream;
-
   protected String alias;
   protected String dateFormat;
   protected boolean prettyPrint;
 
+  private transient Marshaller marshaller;
+
   public XmlFormatter()
   {
     alias = null;
@@ -57,45 +55,32 @@ public class XmlFormatter extends Formatter<String>
   }
 
   @Override
-  public void activate(Context context)
+  public void setup(OperatorContext context)
   {
-    if (prettyPrint) {
-      xstream = new XStream();
-    } else {
-      xstream = new XStream(new XppDriver()
-      {
-        @Override
-        public HierarchicalStreamWriter createWriter(Writer out)
-        {
-          return new CompactWriter(out, getNameCoder());
-        }
-      });
-    }
-    if (alias != null) {
-      try {
-        xstream.alias(alias, clazz);
-      } catch (Throwable e) {
-        throw new RuntimeException("Unable find provided class");
-      }
-    }
-    if (dateFormat != null) {
-      xstream.registerConverter(new DateConverter(dateFormat, new String[] {}));
+    JAXBContext ctx;
+    try {
+      ctx = JAXBContext.newInstance(getClazz());
+      marshaller = ctx.createMarshaller();
+      marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
+      marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, prettyPrint);
+    } catch (JAXBException e) {
+      DTThrowable.wrapIfChecked(e);
     }
   }
 
   @Override
-  public void deactivate()
-  {
-
-  }
-
-  @Override
   public String convert(Object tuple)
   {
     try {
-      return xstream.toXML(tuple);
-    } catch (XStreamException e) {
-      logger.debug("Error while converting tuple {} {} ",tuple,e.getMessage());
+      StringWriter writer = new StringWriter();
+      if (getAlias() != null) {
+        marshaller.marshal(new JAXBElement(new QName(getAlias()), tuple.getClass(), tuple), writer);
+      } else {
+        marshaller.marshal(new JAXBElement(new QName(getClazz().getSimpleName()), tuple.getClass(), tuple), writer);
+      }
+      return writer.toString();
+    } catch (Exception e) {
+      logger.debug("Error while converting tuple {} {} ", tuple, e.getMessage());
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
index 3727d86..9e8ee0f 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
@@ -27,11 +27,10 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceStability;
 
-import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -45,14 +44,14 @@ import com.datatorrent.netlet.util.DTThrowable;
  * @since 3.2.0
  */
 @InterfaceStability.Evolving
-public class JsonParser extends Parser<String>
+public class JsonParser extends Parser<String, String>
 {
 
   private transient ObjectReader reader;
   protected String dateFormat;
 
   @Override
-  public void activate(Context context)
+  public void setup(OperatorContext context)
   {
     try {
       ObjectMapper mapper = new ObjectMapper();
@@ -67,11 +66,6 @@ public class JsonParser extends Parser<String>
   }
 
   @Override
-  public void deactivate()
-  {
-  }
-
-  @Override
   public Object convert(String tuple)
   {
     try {
@@ -86,6 +80,12 @@ public class JsonParser extends Parser<String>
     return null;
   }
 
+  @Override
+  public String processErorrTuple(String input)
+  {
+    return input;
+  }
+
   /**
    * Get the date format
    * 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/main/java/com/datatorrent/lib/parser/Parser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/Parser.java b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
index c9455e2..5bcc1c5 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/Parser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
@@ -20,11 +20,11 @@ package com.datatorrent.lib.parser;
 
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.converter.Converter;
@@ -47,10 +47,15 @@ import com.datatorrent.lib.converter.Converter;
  * @since 3.2.0
  */
 @InterfaceStability.Evolving
-public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>,
-    ActivationListener<Context>
+public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Converter<INPUT, Object>
 {
   protected transient Class<?> clazz;
+  @AutoMetric
+  protected long errorTupleCount;
+  @AutoMetric
+  protected long emittedObjectCount;
+  @AutoMetric
+  protected long incomingTuplesCount;
 
   @OutputPortFieldAnnotation(schemaRequired = true)
   public transient DefaultOutputPort<Object> out = new DefaultOutputPort<Object>()
@@ -61,25 +66,46 @@ public abstract class Parser<INPUT> extends BaseOperator implements Converter<IN
     }
   };
 
-  @OutputPortFieldAnnotation(optional = true)
-  public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
-
+  public transient DefaultOutputPort<ERROROUT> err = new DefaultOutputPort<ERROROUT>();
   public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>()
   {
     @Override
     public void process(INPUT inputTuple)
     {
-      Object tuple = convert(inputTuple);
-      if (tuple == null && err.isConnected()) {
-        err.emit(inputTuple);
-        return;
-      }
-      if (out.isConnected()) {
-        out.emit(tuple);
-      }
+      incomingTuplesCount++;
+      processTuple(inputTuple);
     }
   };
 
+  public void processTuple(INPUT inputTuple)
+  {
+    Object tuple = convert(inputTuple);
+    if (tuple == null && err.isConnected()) {
+      errorTupleCount++;
+      err.emit(processErorrTuple(inputTuple));
+      return;
+    }
+    if (out.isConnected()) {
+      emittedObjectCount++;
+      out.emit(tuple);
+    }
+  }
+
+  public abstract ERROROUT processErorrTuple(INPUT input);
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    errorTupleCount = 0;
+    emittedObjectCount = 0;
+    incomingTuplesCount = 0;
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
   /**
    * Get the class that needs to be formatted
    * 
@@ -99,5 +125,4 @@ public abstract class Parser<INPUT> extends BaseOperator implements Converter<IN
   {
     this.clazz = clazz;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
index 888837d..3d416e1 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
@@ -18,16 +18,33 @@
  */
 package com.datatorrent.lib.parser;
 
+import java.io.IOException;
+
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import javax.xml.validation.Validator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.w3c.dom.Document;
+import org.xml.sax.SAXException;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.XStreamException;
-import com.thoughtworks.xstream.converters.basic.DateConverter;
-
-import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.util.ReusableStringReader;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * Operator that converts XML string to Pojo <br>
@@ -44,99 +61,109 @@ import com.datatorrent.api.Context;
  * @since 3.2.0
  */
 @InterfaceStability.Evolving
-public class XmlParser extends Parser<String>
+public class XmlParser extends Parser<String, String>
 {
+  private String schemaXSDFile;
+  private transient Unmarshaller unmarshaller;
+  private transient Validator validator;
+  private ReusableStringReader reader = new ReusableStringReader();
+  public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>();
 
-  private transient XStream xstream;
-  protected String alias;
-  protected String dateFormats;
-
-  public XmlParser()
+  @Override
+  public Object convert(String tuple)
   {
-    alias = null;
-    dateFormats = null;
+    // This method is not invoked for XML parser
+    return null;
   }
 
   @Override
-  public void activate(Context context)
+  public void processTuple(String inputTuple)
   {
-    xstream = new XStream();
-    if (alias != null) {
+    try {
+      if (out.isConnected()) {
+        reader.open(inputTuple);
+        JAXBElement<?> output = unmarshaller.unmarshal(new StreamSource(reader), getClazz());
+        LOG.debug(output.getValue().toString());
+        emittedObjectCount++;
+        out.emit(output.getValue());
+      } else if (validator != null) {
+        validator.validate(new StreamSource(inputTuple));
+      }
+      if (parsedOutput.isConnected()) {
+        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+        DocumentBuilder builder;
+        try {
+          builder = factory.newDocumentBuilder();
+          Document doc = builder.parse(inputTuple);
+          parsedOutput.emit(doc);
+
+        } catch (Exception e) {
+          LOG.info("Failed to parse xml tuple {}, Exception = {} , StackTrace = {}", inputTuple, e, e.getStackTrace());
+          errorTupleCount++;
+          if (err.isConnected()) {
+            err.emit(inputTuple);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.info("Failed to parse xml tuple {}, Exception = {}, StackTrace = {} ", inputTuple, e, e.getStackTrace());
+      errorTupleCount++;
+      if (err.isConnected()) {
+        err.emit(inputTuple);
+      }
+    } finally {
       try {
-        xstream.alias(alias, clazz);
-      } catch (Throwable e) {
-        throw new RuntimeException("Unable find provided class");
+        if (reader.isOpen()) {
+          reader.close();
+        }
+      } catch (IOException e) {
+        DTThrowable.wrapIfChecked(e);
       }
     }
-    if (dateFormats != null) {
-      String[] dateFormat = dateFormats.split(",");
-      xstream.registerConverter(new DateConverter(dateFormat[0], dateFormat));
-    }
   }
 
   @Override
-  public void deactivate()
+  public String processErorrTuple(String input)
   {
-
+    return input;
   }
 
   @Override
-  public Object convert(String tuple)
+  public void setup(com.datatorrent.api.Context.OperatorContext context)
   {
     try {
-      return xstream.fromXML(tuple);
-    } catch (XStreamException e) {
-      logger.debug("Error while converting tuple {} {}", tuple,e.getMessage());
-      return null;
+      JAXBContext ctx = JAXBContext.newInstance(getClazz());
+      unmarshaller = ctx.createUnmarshaller();
+      if (schemaXSDFile != null) {
+        Path filePath = new Path(schemaXSDFile);
+        Configuration configuration = new Configuration();
+        FileSystem fs = FileSystem.newInstance(filePath.toUri(), configuration);
+        FSDataInputStream inputStream = fs.open(filePath);
+
+        SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+        Schema schema = factory.newSchema(new StreamSource(inputStream));
+        unmarshaller.setSchema(schema);
+        validator = schema.newValidator();
+        fs.close();
+      }
+    } catch (SAXException e) {
+      DTThrowable.wrapIfChecked(e);
+    } catch (JAXBException e) {
+      DTThrowable.wrapIfChecked(e);
+    } catch (IOException e) {
+      DTThrowable.wrapIfChecked(e);
     }
   }
 
-  /**
-   * Gets the alias
-   * 
-   * @return alias.
-   */
-  public String getAlias()
-  {
-    return alias;
-  }
-
-  /**
-   * Sets the alias This maps to the root element of the XML string. If not
-   * specified, parser would expect the root element to be fully qualified name
-   * of the Pojo Class.
-   * 
-   * @param alias
-   *          .
-   */
-  public void setAlias(String alias)
+  public String getSchemaFile()
   {
-    this.alias = alias;
+    return schemaXSDFile;
   }
 
-  /**
-   * Gets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy
-   * where first one would be considered default
-   * 
-   * @return dateFormats.
-   */
-  public String getDateFormats()
+  public void setSchemaFile(String schemaFile)
   {
-    return dateFormats;
+    this.schemaXSDFile = schemaFile;
   }
 
-  /**
-   * Sets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy
-   * where first one would be considered default
-   * 
-   * @param dateFormats
-   *          .
-   */
-  public void setDateFormats(String dateFormats)
-  {
-    this.dateFormats = dateFormats;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(XmlParser.class);
-
+  public static Logger LOG = LoggerFactory.getLogger(Parser.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/main/java/com/datatorrent/lib/util/ReusableStringReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/ReusableStringReader.java b/library/src/main/java/com/datatorrent/lib/util/ReusableStringReader.java
index 8944dc7..3503043 100644
--- a/library/src/main/java/com/datatorrent/lib/util/ReusableStringReader.java
+++ b/library/src/main/java/com/datatorrent/lib/util/ReusableStringReader.java
@@ -92,4 +92,8 @@ public class ReusableStringReader extends Reader
     this.next = 0;
   }
 
+  public boolean isOpen()
+  {
+    return this.str != null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
index bde544e..d9daf97 100644
--- a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
@@ -77,7 +77,6 @@ public class JsonFormatterTest
       TestUtils.setSink(operator.out, validDataSink);
       TestUtils.setSink(operator.err, invalidDataSink);
       operator.setup(null);
-      operator.activate(null);
 
       operator.beginWindow(0);
     }
@@ -120,7 +119,6 @@ public class JsonFormatterTest
     pojo.date = new DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate();
     operator.setDateFormat("dd-MM-yyyy");
     operator.setup(null);
-    operator.activate(null);
     operator.in.put(pojo);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
index 237ae5a..50ed3bd 100644
--- a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
@@ -18,7 +18,14 @@
  */
 package com.datatorrent.lib.formatter;
 
+import java.io.ByteArrayOutputStream;
+import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.logging.XMLFormatter;
+
+import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 
 import org.joda.time.DateTime;
 import org.junit.Assert;
@@ -27,6 +34,11 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.lib.parser.XmlParser;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
 
@@ -64,6 +76,18 @@ public class XmlFormatterTest
     }
 
   }
+  @Test
+  public void testOperatorSerialization()
+  {
+    Kryo kryo = new Kryo();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    Output output = new Output(baos);
+    kryo.writeObject(output, this.operator);
+    output.close();
+    Input input = new Input(baos.toByteArray());
+    XMLFormatter tba1 = kryo.readObject(input, XMLFormatter.class);
+    Assert.assertNotNull("XML parser not null", tba1);
+  }
 
   @Test
   public void testPojoToXmlWithoutAlias()
@@ -75,13 +99,12 @@ public class XmlFormatterTest
     e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
 
     operator.setup(null);
-    operator.activate(null);
     operator.in.process(e);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>"
+    String expected = "<EmployeeBean>" + "<name>john</name>"
         + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>"
-        + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>";
+        + "</EmployeeBean>";
     Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
   }
 
@@ -96,7 +119,6 @@ public class XmlFormatterTest
 
     operator.setAlias("EmployeeBean");
     operator.setup(null);
-    operator.activate(null);
     operator.in.process(e);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
@@ -117,12 +139,11 @@ public class XmlFormatterTest
     operator.setAlias("EmployeeBean");
     operator.setPrettyPrint(true);
     operator.setup(null);
-    operator.activate(null);
     operator.in.process(e);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expected = "<EmployeeBean>\n" + "  <name>john</name>\n" + "  <dept>cs</dept>\n" + "  <eid>1</eid>\n"
-        + "  <dateOfJoining>2015-01-01</dateOfJoining>\n" + "</EmployeeBean>";
+    String expected = "<EmployeeBean>\n" + "    <name>john</name>\n" + "    <dept>cs</dept>\n" + "    <eid>1</eid>\n"
+        + "    <dateOfJoining>2015-01-01</dateOfJoining>\n" + "</EmployeeBean>";
     Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
   }
 
@@ -140,18 +161,48 @@ public class XmlFormatterTest
     e.setAddress(address);
 
     operator.setup(null);
-    operator.activate(null);
     operator.in.process(e);
     System.out.println(validDataSink.collectedTuples.get(0));
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>"
+    String expected = "<EmployeeBean>" + "<name>john</name>"
         + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>"
         + "<city>new york</city>" + "<country>US</country>" + "</address>"
-        + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>";
+        + "</EmployeeBean>";
     Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
   }
+  
+  public static class DateAdapter extends XmlAdapter<String, Date>
+  {
+
+    private String dateFormatString = "yyyy-MM-dd";
+    private SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
+
+    @Override
+    public String marshal(Date v) throws Exception
+    {
+      return dateFormat.format(v);
+    }
+
+    @Override
+    public Date unmarshal(String v) throws Exception
+    {
+      return dateFormat.parse(v);
+    }
+
+    public String getDateFormatString()
+    {
+      return dateFormatString;
+    }
+
+    public void setDateFormatString(String dateFormatString)
+    {
+      this.dateFormatString = dateFormatString;
+    }
+
+  }
 
+  @XmlType (propOrder={"name","dept","eid", "dateOfJoining", "address"})
   public static class EmployeeBean
   {
 
@@ -191,6 +242,7 @@ public class XmlFormatterTest
       this.eid = eid;
     }
 
+    @XmlJavaTypeAdapter(DateAdapter.class)
     public Date getDateOfJoining()
     {
       return dateOfJoining;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java b/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
index d091267..0400d23 100644
--- a/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
+++ b/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
@@ -77,7 +77,6 @@ public class JsonParserTest
       TestUtils.setSink(operator.out, validDataSink);
       TestUtils.setSink(operator.err, invalidDataSink);
       operator.setup(null);
-      operator.activate(null);
 
       operator.beginWindow(0);
     }
@@ -119,7 +118,6 @@ public class JsonParserTest
     String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
     operator.setDateFormat("dd-MM-yyyy");
     operator.setup(null);
-    operator.activate(null);
     operator.in.put(tuple);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java b/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java
index a9504c4..5ce07f5 100644
--- a/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java
+++ b/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java
@@ -18,8 +18,13 @@
  */
 package com.datatorrent.lib.parser;
 
+import java.io.ByteArrayOutputStream;
+import java.text.SimpleDateFormat;
 import java.util.Date;
 
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
 import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -27,6 +32,10 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
 
@@ -48,7 +57,6 @@ public class XmlParserTest
       super.starting(description);
       operator = new XmlParser();
       operator.setClazz(EmployeeBean.class);
-      operator.setDateFormats("yyyy-MM-dd"); //setting default date pattern
       validDataSink = new CollectorTestSink<Object>();
       invalidDataSink = new CollectorTestSink<String>();
       TestUtils.setSink(operator.out, validDataSink);
@@ -65,6 +73,19 @@ public class XmlParserTest
   }
 
   @Test
+  public void testOperatorSerialization()
+  {
+    Kryo kryo = new Kryo();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    Output output = new Output(baos);
+    kryo.writeObject(output, this.operator);
+    output.close();
+    Input input = new Input(baos.toByteArray());
+    XmlParser tba1 = kryo.readObject(input, XmlParser.class);
+    Assert.assertNotNull("XML parser not null", tba1);
+  }
+
+  @Test
   public void testXmlToPojoWithoutAlias()
   {
     String tuple = "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + "<name>john</name>"
@@ -72,7 +93,6 @@ public class XmlParserTest
         + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
 
     operator.setup(null);
-    operator.activate(null);
     operator.in.process(tuple);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
@@ -92,12 +112,60 @@ public class XmlParserTest
   public void testXmlToPojoWithAliasDateFormat()
   {
     String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
-        + "<dateOfJoining>2015-JAN-01</dateOfJoining>" + "</EmployeeBean>";
+        + "<dateOfJoiningYYYYMMDDFormat>2015-JAN-01</dateOfJoiningYYYYMMDDFormat>" + "</EmployeeBean>";
+
+    operator.setClazz(EmployeeBeanOverride.class);
+    operator.setup(null);
+    operator.in.process(tuple);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    Object obj = validDataSink.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(EmployeeBeanOverride.class, obj.getClass());
+    EmployeeBeanOverride pojo = (EmployeeBeanOverride)obj;
+    Assert.assertEquals("john", pojo.getName());
+    Assert.assertEquals("cs", pojo.getDept());
+    Assert.assertEquals(1, pojo.getEid());
+    Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear());
+    Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear());
+    Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
+  }
 
-    operator.setAlias("EmployeeBean");
-    operator.setDateFormats("yyyy-MM-dd,yyyy-MMM-dd");
+  @Test
+  public void testXSDValidation()
+  {
+    String xsdFile = "src/test/resources/employeeBean.xsd";
+    operator.setSchemaFile(xsdFile);
+    // Check without address field xsd validation fails
+    String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
+        + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>";
     operator.setup(null);
-    operator.activate(null);
+    operator.in.process(tuple);
+    Assert.assertEquals(0, validDataSink.collectedTuples.size());
+    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+
+    // Check extra fields present in xml, which are not part of xsd
+    tuple = "<EmployeeBean>" + "<name>john</name>"
+        + "<firstname>john</firstname>" //incorrect field name, xsd validation would fail
+        + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>"
+        + "<city>new york</city>" + "<country>US</country>" + "</address>" + "</EmployeeBean>";
+
+    validDataSink.collectedTuples.clear();
+    invalidDataSink.collectedTuples.clear();
+    operator.in.process(tuple);
+    Assert.assertEquals(0, validDataSink.collectedTuples.size());
+    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+
+    // Check with all fields in xsd, POJO output should be received
+
+    tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
+        + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" + "<city>new york</city>" + "<country>US</country>"
+        + "</address>" + "</EmployeeBean>";
+
+    validDataSink.collectedTuples.clear();
+    invalidDataSink.collectedTuples.clear();
     operator.in.process(tuple);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
@@ -111,6 +179,9 @@ public class XmlParserTest
     Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear());
     Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear());
     Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
+    Assert.assertEquals(Address.class, pojo.getAddress().getClass());
+    Assert.assertEquals("new york", pojo.getAddress().getCity());
+    Assert.assertEquals("US", pojo.getAddress().getCountry());
   }
 
   @Test
@@ -119,9 +190,7 @@ public class XmlParserTest
     String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
         + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>";
 
-    operator.setAlias("EmployeeBean");
     operator.setup(null);
-    operator.activate(null);
     operator.in.process(tuple);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
@@ -140,14 +209,11 @@ public class XmlParserTest
   @Test
   public void testXmlToPojoIncorrectXML()
   {
-    String tuple = "<EmployeeBean>"
-        + "<firstname>john</firstname>" //incorrect field name
-        + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01 00:00:00.00 IST</dateOfJoining>"
-        + "</EmployeeBean>";
+    String tuple = "<EmployeeBean>" + "<firstname>john</firstname>" //incorrect field name is ignored by JAXB 
+        + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01 00:00:00.00 IST</dateOfJoining>";
+    // + "</EmployeeBean>"; // Incorrect XML format
 
-    operator.setAlias("EmployeeBean");
     operator.setup(null);
-    operator.activate(null);
     operator.in.process(tuple);
     Assert.assertEquals(0, validDataSink.collectedTuples.size());
     Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
@@ -163,7 +229,6 @@ public class XmlParserTest
         + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
 
     operator.setup(null);
-    operator.activate(null);
     operator.in.process(tuple);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
@@ -182,13 +247,59 @@ public class XmlParserTest
     Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
   }
 
+  public static class DateAdapter extends XmlAdapter<String, Date>
+  {
+
+    private String dateFormatString = "yyyy-MMM-dd";
+    private SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
+
+    @Override
+    public String marshal(Date v) throws Exception
+    {
+      return dateFormat.format(v);
+    }
+
+    @Override
+    public Date unmarshal(String v) throws Exception
+    {
+      return dateFormat.parse(v);
+    }
+
+    public String getDateFormatString()
+    {
+      return dateFormatString;
+    }
+
+    public void setDateFormatString(String dateFormatString)
+    {
+      this.dateFormatString = dateFormatString;
+    }
+
+  }
+
+  public static class EmployeeBeanOverride extends EmployeeBean
+  {
+    @XmlJavaTypeAdapter(DateAdapter.class)
+    public void setDateOfJoiningYYYYMMDDFormat(Date dateOfJoining)
+    {
+      this.dateOfJoining = dateOfJoining;
+    }
+  }
+
   public static class EmployeeBean
   {
 
+    @Override
+    public String toString()
+    {
+      return "EmployeeBean [name=" + name + ", dept=" + dept + ", eid=" + eid + ", dateOfJoining=" + dateOfJoining
+          + ", address=" + address + "]";
+    }
+
     private String name;
     private String dept;
     private int eid;
-    private Date dateOfJoining;
+    protected Date dateOfJoining;
     private Address address;
 
     public String getName()
@@ -245,6 +356,12 @@ public class XmlParserTest
   public static class Address
   {
 
+    @Override
+    public String toString()
+    {
+      return "Address [city=" + city + ", country=" + country + "]";
+    }
+
     private String city;
     private String country;
 
@@ -268,5 +385,4 @@ public class XmlParserTest
       this.country = country;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/library/src/test/resources/employeeBean.xsd
----------------------------------------------------------------------
diff --git a/library/src/test/resources/employeeBean.xsd b/library/src/test/resources/employeeBean.xsd
new file mode 100644
index 0000000..f481e61
--- /dev/null
+++ b/library/src/test/resources/employeeBean.xsd
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
+ 
+  <xs:element name="EmployeeBean">
+  <xs:complexType>
+    <xs:sequence>
+      <xs:element name="name" type="xs:string"/>
+      <xs:element name="dept" type="xs:string"/>
+      <xs:element name="eid" type="xs:integer"/>
+      <xs:element name="dateOfJoining" type="xs:date"/>
+      <xs:element name="address">
+      	<xs:complexType>
+		    <xs:sequence>
+			    <xs:element name="city" type="xs:string"/>
+			    <xs:element name="country" type="xs:string"/>
+		    </xs:sequence>
+		</xs:complexType>
+      </xs:element>
+    </xs:sequence>
+  </xs:complexType>
+</xs:element>
+ 
+</xs:schema>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7a1d0871/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d4b2571..874f30f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
           <excludes combine.children="append">
             <exclude>**/src/test/resources/**/sample_logs/**</exclude>
             <exclude>src/test/resources/*.csv</exclude>
+            <exclude>src/test/resources/*.xsd</exclude>
             <exclude>**/src/main/resources/com/datatorrent/apps/logstream/**</exclude>
             <exclude>src/main/c/zmq_push/Makefile</exclude>
             <exclude>src/test/resources/com/datatorrent/contrib/romesyndication/*.rss</exclude>
@@ -78,6 +79,7 @@
           <excludes combine.children="append">
             <exclude>**/src/test/resources/**/sample_logs/**</exclude>
             <exclude>src/test/resources/*.csv</exclude>
+            <exclude>src/test/resources/*.xsd</exclude>
             <exclude>**/src/main/resources/com/datatorrent/apps/logstream/**</exclude>
             <exclude>src/main/c/zmq_push/Makefile</exclude>
             <exclude>src/test/resources/com/datatorrent/contrib/romesyndication/*.rss</exclude>