You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by ishark <gi...@git.apache.org> on 2015/12/16 01:07:34 UTC

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

GitHub user ishark opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137

    Added XSD validation to XML parsers and moved XML and JSON parsers to Malhar Library

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ishark/incubator-apex-malhar moveParsers

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/137.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #137
    
----
commit 23dabc414b9df2d3b0739933406ad11c2388f0b6
Author: ishark <is...@datatorrent.com>
Date:   2015-12-15T22:52:18Z

    Moving XML and JSON parsers & formatters to Malhar-lib and changing package names for parsers & formatters

commit 8462b18caf312d4003216b1c11eadb2302f16f76
Author: ishark <is...@datatorrent.com>
Date:   2015-12-16T00:03:09Z

    Remove ActivationListener interface from Parsers
    Add XSD validation to XML parser. Change XML parser to use JAXB instead of Xstream to support XSD validation.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48223914
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.parser;
    +
    +import java.io.IOException;
    +import java.io.StringReader;
    +
    +import javax.xml.XMLConstants;
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.JAXBElement;
    +import javax.xml.bind.JAXBException;
    +import javax.xml.bind.Unmarshaller;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import javax.xml.transform.stream.StreamSource;
    +import javax.xml.validation.Schema;
    +import javax.xml.validation.SchemaFactory;
    +import javax.xml.validation.Validator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Document;
    +import org.xml.sax.SAXException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +public class XmlParser extends Parser<String>
    --- End diff --
    
    java docs 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48463479
  
    --- Diff: library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java ---
    @@ -65,6 +73,19 @@ protected void finished(Description description)
       }
     
       @Test
    +  public void testOperatorSerialization()
    --- End diff --
    
    Curious to know purpose of this test. Isn't it something that applies to all operators ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48319426
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    I think logs is better place to capture reason for error tuples as it can print stack trace as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48221880
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.parser;
    +
    +import java.io.IOException;
    +import java.io.StringReader;
    +
    +import javax.xml.XMLConstants;
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.JAXBElement;
    +import javax.xml.bind.JAXBException;
    +import javax.xml.bind.Unmarshaller;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import javax.xml.transform.stream.StreamSource;
    +import javax.xml.validation.Schema;
    +import javax.xml.validation.SchemaFactory;
    +import javax.xml.validation.Validator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Document;
    +import org.xml.sax.SAXException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +public class XmlParser extends Parser<String>
    +{
    +  private String schemaXSDFile;
    +  private transient Unmarshaller unmarshaller;
    +  private transient Validator validator;
    +
    +  public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>();
    +
    +  @Override
    +  public Object convert(String tuple)
    +  {
    +    // This method is not invoked for XML parser
    +    return null;
    +  }
    +
    +  @Override
    +  public void processTuple(String inputTuple)
    +  {
    +    try {
    +      if (out.isConnected()) {
    +        StringReader reader = new StringReader(inputTuple);
    +        JAXBElement<?> output = unmarshaller.unmarshal(new StreamSource(reader), getClazz());
    +        LOG.debug(output.getValue().toString());
    +        emittedObjectCount++;
    +        out.emit(output.getValue());
    +      } else {
    +        validator.validate(new StreamSource(inputTuple));
    +      }
    +      if (validatedOutput.isConnected()) {
    +        validatedOutput.emit(inputTuple);
    +      }
    +      if (parsedOutput.isConnected()) {
    +        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
    +        DocumentBuilder builder;
    +        try {
    +          builder = factory.newDocumentBuilder();
    +          Document doc = builder.parse(inputTuple);
    +          parsedOutput.emit(doc);
    +
    +        } catch (Exception e) {
    +          // It should not reach here, since inputTuple string is already validaed.
    +        }
    +      }
    +    } catch (Exception e) {
    +      LOG.debug("Failed to parse xml tuple {}, Exception = {} ", inputTuple, e);
    +      errorTupleCount++;
    +      if (err.isConnected()) {
    +        err.emit(inputTuple);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void setup(com.datatorrent.api.Context.OperatorContext context)
    +  {
    +    try {
    +      JAXBContext ctx = JAXBContext.newInstance(getClazz());
    +      unmarshaller = ctx.createUnmarshaller();
    +      if (schemaXSDFile != null) {
    +        Path filePath = new Path(schemaXSDFile);
    --- End diff --
    
    Can we have reading the file part be put into a utility class that has method to take uri and return contents of the file as stream / string . so that even others can reuse the code ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48221972
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.parser;
    +
    +import java.io.IOException;
    +import java.io.StringReader;
    +
    +import javax.xml.XMLConstants;
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.JAXBElement;
    +import javax.xml.bind.JAXBException;
    +import javax.xml.bind.Unmarshaller;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import javax.xml.transform.stream.StreamSource;
    +import javax.xml.validation.Schema;
    +import javax.xml.validation.SchemaFactory;
    +import javax.xml.validation.Validator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Document;
    +import org.xml.sax.SAXException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +public class XmlParser extends Parser<String>
    +{
    +  private String schemaXSDFile;
    +  private transient Unmarshaller unmarshaller;
    +  private transient Validator validator;
    +
    +  public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>();
    +
    +  @Override
    +  public Object convert(String tuple)
    +  {
    +    // This method is not invoked for XML parser
    +    return null;
    +  }
    +
    +  @Override
    +  public void processTuple(String inputTuple)
    +  {
    +    try {
    +      if (out.isConnected()) {
    +        StringReader reader = new StringReader(inputTuple);
    +        JAXBElement<?> output = unmarshaller.unmarshal(new StreamSource(reader), getClazz());
    +        LOG.debug(output.getValue().toString());
    +        emittedObjectCount++;
    +        out.emit(output.getValue());
    +      } else {
    +        validator.validate(new StreamSource(inputTuple));
    +      }
    +      if (validatedOutput.isConnected()) {
    +        validatedOutput.emit(inputTuple);
    +      }
    +      if (parsedOutput.isConnected()) {
    +        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
    +        DocumentBuilder builder;
    +        try {
    +          builder = factory.newDocumentBuilder();
    +          Document doc = builder.parse(inputTuple);
    +          parsedOutput.emit(doc);
    +
    +        } catch (Exception e) {
    +          // It should not reach here, since inputTuple string is already validaed.
    +        }
    +      }
    +    } catch (Exception e) {
    +      LOG.debug("Failed to parse xml tuple {}, Exception = {} ", inputTuple, e);
    +      errorTupleCount++;
    +      if (err.isConnected()) {
    +        err.emit(inputTuple);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void setup(com.datatorrent.api.Context.OperatorContext context)
    +  {
    +    try {
    +      JAXBContext ctx = JAXBContext.newInstance(getClazz());
    +      unmarshaller = ctx.createUnmarshaller();
    +      if (schemaXSDFile != null) {
    +        Path filePath = new Path(schemaXSDFile);
    +        Configuration configuration = new Configuration();
    +        FileSystem fs = FileSystem.newInstance(filePath.toUri(), configuration);
    +        FSDataInputStream inputStream = fs.open(filePath);
    --- End diff --
    
    Closing fs ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48301784
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.parser;
    +
    +import java.io.IOException;
    +import java.io.StringReader;
    +
    +import javax.xml.XMLConstants;
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.JAXBElement;
    +import javax.xml.bind.JAXBException;
    +import javax.xml.bind.Unmarshaller;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import javax.xml.transform.stream.StreamSource;
    +import javax.xml.validation.Schema;
    +import javax.xml.validation.SchemaFactory;
    +import javax.xml.validation.Validator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Document;
    +import org.xml.sax.SAXException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +public class XmlParser extends Parser<String>
    +{
    +  private String schemaXSDFile;
    +  private transient Unmarshaller unmarshaller;
    +  private transient Validator validator;
    +
    +  public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>();
    +
    +  @Override
    +  public Object convert(String tuple)
    +  {
    +    // This method is not invoked for XML parser
    +    return null;
    +  }
    +
    +  @Override
    +  public void processTuple(String inputTuple)
    +  {
    +    try {
    +      if (out.isConnected()) {
    +        StringReader reader = new StringReader(inputTuple);
    +        JAXBElement<?> output = unmarshaller.unmarshal(new StreamSource(reader), getClazz());
    +        LOG.debug(output.getValue().toString());
    +        emittedObjectCount++;
    +        out.emit(output.getValue());
    +      } else {
    +        validator.validate(new StreamSource(inputTuple));
    --- End diff --
    
    Unmarshal does validation as well, so no need to explicitly validate in POJO case.
    Yes, validate should only be applied if Schema file is specified. Will fix that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48425318
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    hi @shubham-pathak22, I have already changed the type of error port to be ERROROUT in the latest diff. However, the concrete implementations still emit the same tuple on error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48321570
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -47,10 +47,19 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>,
    -    ActivationListener<Context>
    +public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>
     {
       protected transient Class<?> clazz;
    +  @AutoMetric
    +  protected long errorTupleCount;
    +  @AutoMetric
    +  protected long emittedObjectCount;
    +  @AutoMetric
    +  protected long rollingPercentValidTuples;
    +  @AutoMetric
    +  protected long rollingPercentErrorTuples;
    +  @AutoMetric
    +  protected long incomingTuplesCount;
     
       @OutputPortFieldAnnotation(schemaRequired = true)
    --- End diff --
    
    Well, what i meant is if user doesn't want pojos he/she wont connect to this  port and would not set the class property as well. Basically user wont provide class if pojo's aren't required. But because of schemaRequired = true validation would fail


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48381493
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    Would it be good to have default as  emitting same input to error port and logging error.
    If transformation is needed, like you suggested, users can add/override additional port for processedError or connect another operator for error processing to the default error port, whichever applies to specific scenario. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48103837
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -47,10 +47,19 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>,
    -    ActivationListener<Context>
    +public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>
     {
       protected transient Class<?> clazz;
    +  @AutoMetric
    +  protected long errorTupleCount;
    +  @AutoMetric
    +  protected long emittedObjectCount;
    +  @AutoMetric
    +  protected long rollingPercentValidTuples;
    +  @AutoMetric
    +  protected long rollingPercentErrorTuples;
    +  @AutoMetric
    +  protected long incomingTuplesCount;
     
       @OutputPortFieldAnnotation(schemaRequired = true)
    --- End diff --
    
    schemaRequired=true is very restrictive. Suppose XMLParser  operator is used  and i don't want pojos and so i will not connect to this port. But because of schemaRequired=true  application will fail during verification.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48221312
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.parser;
    +
    +import java.io.IOException;
    +import java.io.StringReader;
    +
    +import javax.xml.XMLConstants;
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.JAXBElement;
    +import javax.xml.bind.JAXBException;
    +import javax.xml.bind.Unmarshaller;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import javax.xml.transform.stream.StreamSource;
    +import javax.xml.validation.Schema;
    +import javax.xml.validation.SchemaFactory;
    +import javax.xml.validation.Validator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Document;
    +import org.xml.sax.SAXException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +public class XmlParser extends Parser<String>
    +{
    +  private String schemaXSDFile;
    +  private transient Unmarshaller unmarshaller;
    +  private transient Validator validator;
    +
    +  public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>();
    +
    +  @Override
    +  public Object convert(String tuple)
    +  {
    +    // This method is not invoked for XML parser
    +    return null;
    +  }
    +
    +  @Override
    +  public void processTuple(String inputTuple)
    +  {
    +    try {
    +      if (out.isConnected()) {
    +        StringReader reader = new StringReader(inputTuple);
    --- End diff --
    
    Can we consider ReusableStringReader here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48382910
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    @ishark please consider my comment as just a suggestion. I don't have a strong opinion here. These things evolve as we see more use cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/137


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48321356
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    Certainly, logs can contain stack trace. But a short error message on error port along with error tuple would be useful for users to quickly find out validation errors by connecting to the error port instead of digging into the logs.
    What i suggest, the port should at least have provision to send error message.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r47726307
  
    --- Diff: library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java ---
    @@ -57,45 +55,32 @@ public XmlFormatter()
       }
     
       @Override
    -  public void activate(Context context)
    +  public void setup(OperatorContext context)
       {
    -    if (prettyPrint) {
    -      xstream = new XStream();
    -    } else {
    -      xstream = new XStream(new XppDriver()
    -      {
    -        @Override
    -        public HierarchicalStreamWriter createWriter(Writer out)
    -        {
    -          return new CompactWriter(out, getNameCoder());
    -        }
    -      });
    -    }
    -    if (alias != null) {
    -      try {
    -        xstream.alias(alias, clazz);
    -      } catch (Throwable e) {
    -        throw new RuntimeException("Unable find provided class");
    -      }
    -    }
    -    if (dateFormat != null) {
    -      xstream.registerConverter(new DateConverter(dateFormat, new String[] {}));
    +    JAXBContext ctx;
    +    try {
    +      ctx = JAXBContext.newInstance(getClazz());
    +      marshaller = ctx.createMarshaller();
    +      marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
    +      marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, prettyPrint);
    +    } catch (JAXBException e) {
    +      DTThrowable.wrapIfChecked(e);
         }
       }
     
       @Override
    -  public void deactivate()
    -  {
    -
    -  }
    -
    -  @Override
       public String convert(Object tuple)
       {
         try {
    -      return xstream.toXML(tuple);
    -    } catch (XStreamException e) {
    -      logger.debug("Error while converting tuple {} {} ",tuple,e.getMessage());
    +      StringWriter writer = new StringWriter();
    +      if (getAlias() != null) {
    +        marshaller.marshal(new JAXBElement(new QName(getAlias()), tuple.getClass(), tuple), writer);
    +      } else {
    +        marshaller.marshal(new JAXBElement(new QName(getClazz().getSimpleName()), tuple.getClass(), tuple), writer);
    +      }
    +      return writer.toString();
    +    } catch (Exception e) {
    +      logger.debug("Error while converting tuple {} {} ", tuple, e.getMessage());
           return null;
    --- End diff --
    
    This is because we do not want the operator to stop if one of the input strings is incorrect. The super class emits the tuple to error port if parsing fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48301231
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    -
    +  public transient DefaultOutputPort<INPUT> validatedOutput = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    Removed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r47726390
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.parser;
    +
    +import java.io.IOException;
    +import java.io.StringReader;
    +
    +import javax.xml.XMLConstants;
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.JAXBElement;
    +import javax.xml.bind.JAXBException;
    +import javax.xml.bind.Unmarshaller;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import javax.xml.transform.stream.StreamSource;
    +import javax.xml.validation.Schema;
    +import javax.xml.validation.SchemaFactory;
    +import javax.xml.validation.Validator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Document;
    +import org.xml.sax.SAXException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +public class XmlParser extends Parser<String>
    +{
    +  private String schemaXSDFile;
    +  private transient Unmarshaller unmarshaller;
    +  private transient Validator validator;
    +
    +  public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>();
    +
    +  @Override
    +  public Object convert(String tuple)
    +  {
    +    // This method is not invoked for XML parser
    +    return null;
    +  }
    +
    +  @Override
    +  public void processTuple(String inputTuple)
    +  {
    +    try {
    +      if (out.isConnected()) {
    +        StringReader reader = new StringReader(inputTuple);
    +        JAXBElement<?> output = unmarshaller.unmarshal(new StreamSource(reader), getClazz());
    +        LOG.debug(output.getValue().toString());
    +        emittedObjectCount++;
    +        out.emit(output.getValue());
    +      } else {
    +        validator.validate(new StreamSource(inputTuple));
    +      }
    +      if (validatedOutput.isConnected()) {
    +        validatedOutput.emit(inputTuple);
    +      }
    +      if (parsedOutput.isConnected()) {
    +        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
    +        DocumentBuilder builder;
    +        try {
    +          builder = factory.newDocumentBuilder();
    +          Document doc = builder.parse(inputTuple);
    +          parsedOutput.emit(doc);
    +
    +        } catch (Exception e) {
    +          e.printStackTrace();
    --- End diff --
    
    Removed print statement. The input is already validated before it parses. So, it should not reach in catch block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48379752
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    -
    +  public transient DefaultOutputPort<INPUT> validatedOutput = new DefaultOutputPort<INPUT>();
       public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>()
       {
         @Override
         public void process(INPUT inputTuple)
         {
    -      Object tuple = convert(inputTuple);
    -      if (tuple == null && err.isConnected()) {
    -        err.emit(inputTuple);
    -        return;
    -      }
    -      if (out.isConnected()) {
    -        out.emit(tuple);
    -      }
    +      incomingTuplesCount++;
    +      processTuple(inputTuple);
         }
       };
     
    +  public void processTuple(INPUT inputTuple)
    +  {
    +    Object tuple = convert(inputTuple);
    +    if (tuple == null && err.isConnected()) {
    +      errorTupleCount++;
    +      err.emit(inputTuple);
    +      return;
    +    }
    +    if (out.isConnected()) {
    +      emittedObjectCount++;
    +      out.emit(tuple);
    +    }
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    errorTupleCount = 0;
    +    emittedObjectCount = 0;
    +    rollingPercentValidTuples = 0;
    +    rollingPercentErrorTuples = 0;
    +  }
    +
    +  @Override
    +  public void endWindow()
    +  {
    --- End diff --
    
    AppDataTracker does first, last as well. Application developer can provide the dimension scheme for the %metrics to be just FIRST/LAST


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48301527
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    -
    +  public transient DefaultOutputPort<INPUT> validatedOutput = new DefaultOutputPort<INPUT>();
       public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>()
       {
         @Override
         public void process(INPUT inputTuple)
         {
    -      Object tuple = convert(inputTuple);
    -      if (tuple == null && err.isConnected()) {
    -        err.emit(inputTuple);
    -        return;
    -      }
    -      if (out.isConnected()) {
    -        out.emit(tuple);
    -      }
    +      incomingTuplesCount++;
    +      processTuple(inputTuple);
         }
       };
     
    +  public void processTuple(INPUT inputTuple)
    +  {
    +    Object tuple = convert(inputTuple);
    +    if (tuple == null && err.isConnected()) {
    +      errorTupleCount++;
    +      err.emit(inputTuple);
    +      return;
    +    }
    +    if (out.isConnected()) {
    +      emittedObjectCount++;
    +      out.emit(tuple);
    +    }
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    errorTupleCount = 0;
    +    emittedObjectCount = 0;
    +    rollingPercentValidTuples = 0;
    +    rollingPercentErrorTuples = 0;
    +  }
    +
    +  @Override
    +  public void endWindow()
    +  {
    --- End diff --
    
    Valid point. I added these, since it was mentioned to capture percent metrics as well. Will check what is the best way to add these. I think max and avg still will hold for percentages. Sum probably is not relevant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48301195
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -47,10 +47,19 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>,
    -    ActivationListener<Context>
    +public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>
     {
       protected transient Class<?> clazz;
    +  @AutoMetric
    +  protected long errorTupleCount;
    +  @AutoMetric
    +  protected long emittedObjectCount;
    +  @AutoMetric
    +  protected long rollingPercentValidTuples;
    +  @AutoMetric
    +  protected long rollingPercentErrorTuples;
    +  @AutoMetric
    +  protected long incomingTuplesCount;
     
       @OutputPortFieldAnnotation(schemaRequired = true)
    --- End diff --
    
    This only applies to POJO output port, This is another way to set clazz attribute in operator, by setting Context.PortContext.TUPLE_CLASS property to expected pojo output class type. If pojo output port is not connected, it wouldn't fail in this check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48507863
  
    --- Diff: library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java ---
    @@ -65,6 +73,19 @@ protected void finished(Description description)
       }
     
       @Test
    +  public void testOperatorSerialization()
    --- End diff --
    
    @shubham-pathak22  this test is only to check if the operator is kryo serializable. If operator's test run a dag then serialization automatically gets validated in unit tests. However, all the tests for this operator directly call process on operator input port, so serialization never got checked. Hence added the test. 
    But yes, @chandnisingh had mentioned there could be a utility for this test going forward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48221786
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.parser;
    +
    +import java.io.IOException;
    +import java.io.StringReader;
    +
    +import javax.xml.XMLConstants;
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.JAXBElement;
    +import javax.xml.bind.JAXBException;
    +import javax.xml.bind.Unmarshaller;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import javax.xml.transform.stream.StreamSource;
    +import javax.xml.validation.Schema;
    +import javax.xml.validation.SchemaFactory;
    +import javax.xml.validation.Validator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Document;
    +import org.xml.sax.SAXException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +public class XmlParser extends Parser<String>
    +{
    +  private String schemaXSDFile;
    +  private transient Unmarshaller unmarshaller;
    +  private transient Validator validator;
    +
    +  public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>();
    +
    +  @Override
    +  public Object convert(String tuple)
    +  {
    +    // This method is not invoked for XML parser
    +    return null;
    +  }
    +
    +  @Override
    +  public void processTuple(String inputTuple)
    +  {
    +    try {
    +      if (out.isConnected()) {
    +        StringReader reader = new StringReader(inputTuple);
    +        JAXBElement<?> output = unmarshaller.unmarshal(new StreamSource(reader), getClazz());
    +        LOG.debug(output.getValue().toString());
    +        emittedObjectCount++;
    +        out.emit(output.getValue());
    +      } else {
    +        validator.validate(new StreamSource(inputTuple));
    --- End diff --
    
    why is validator.validate in else part ? Shouldn't it be the first step?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48222378
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    -
    +  public transient DefaultOutputPort<INPUT> validatedOutput = new DefaultOutputPort<INPUT>();
       public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>()
       {
         @Override
         public void process(INPUT inputTuple)
         {
    -      Object tuple = convert(inputTuple);
    -      if (tuple == null && err.isConnected()) {
    -        err.emit(inputTuple);
    -        return;
    -      }
    -      if (out.isConnected()) {
    -        out.emit(tuple);
    -      }
    +      incomingTuplesCount++;
    +      processTuple(inputTuple);
         }
       };
     
    +  public void processTuple(INPUT inputTuple)
    +  {
    +    Object tuple = convert(inputTuple);
    +    if (tuple == null && err.isConnected()) {
    +      errorTupleCount++;
    +      err.emit(inputTuple);
    +      return;
    +    }
    +    if (out.isConnected()) {
    +      emittedObjectCount++;
    +      out.emit(tuple);
    +    }
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    errorTupleCount = 0;
    +    emittedObjectCount = 0;
    +    rollingPercentValidTuples = 0;
    +    rollingPercentErrorTuples = 0;
    +  }
    +
    +  @Override
    +  public void endWindow()
    +  {
    --- End diff --
    
    Why are we emitting percentage ? AppDataTracker aggregates the counters per window basis. On ADT side we have sum/max/avg etc which will not hold true/have no meaning w.r.t percentages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48379303
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    Why don't we remove the error port and add process error tuples method which by default logs error messages. I think logging is the obvious default behavior.
    
    Users can add error port and  override this processErrorTuples().  The reason is that we don't know what kind of tuples are error tuples - same type as Input tuples or different.  
    From the customer use cases seen so far they are either input tuples which are logged to files or the type is completely different.
    Currently we don't have a reason to assume that error tuples are KeyValPair so that shouldn't be picked as default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48103875
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    Instead of err<INPUT>, I feel error port with KeyValPair<STRING,STRING>  would be better as we could give the error reason along with that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48380423
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -47,10 +47,19 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>,
    -    ActivationListener<Context>
    +public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>
     {
       protected transient Class<?> clazz;
    +  @AutoMetric
    +  protected long errorTupleCount;
    +  @AutoMetric
    +  protected long emittedObjectCount;
    +  @AutoMetric
    +  protected long rollingPercentValidTuples;
    +  @AutoMetric
    +  protected long rollingPercentErrorTuples;
    +  @AutoMetric
    +  protected long incomingTuplesCount;
     
       @OutputPortFieldAnnotation(schemaRequired = true)
    --- End diff --
    
    @ishark is correct. If the schemaRequired port is not connected, then validation will not fail even though the TUPLE_CLASS attribute is missing.
    
    @shubham-pathak22 Schema required annotation is an expressive way to let the users know that processing of tuples which are received on this port requires the tuple schema. I will suggest that if you face any limitations or problems with this feature, then you should take it up on apex dev.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48223809
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.parser;
    +
    +import java.io.IOException;
    +import java.io.StringReader;
    +
    +import javax.xml.XMLConstants;
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.JAXBElement;
    +import javax.xml.bind.JAXBException;
    +import javax.xml.bind.Unmarshaller;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import javax.xml.transform.stream.StreamSource;
    +import javax.xml.validation.Schema;
    +import javax.xml.validation.SchemaFactory;
    +import javax.xml.validation.Validator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Document;
    +import org.xml.sax.SAXException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +public class XmlParser extends Parser<String>
    +{
    +  private String schemaXSDFile;
    +  private transient Unmarshaller unmarshaller;
    +  private transient Validator validator;
    +
    +  public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>();
    +
    +  @Override
    +  public Object convert(String tuple)
    +  {
    +    // This method is not invoked for XML parser
    +    return null;
    +  }
    +
    +  @Override
    +  public void processTuple(String inputTuple)
    +  {
    +    try {
    +      if (out.isConnected()) {
    +        StringReader reader = new StringReader(inputTuple);
    +        JAXBElement<?> output = unmarshaller.unmarshal(new StreamSource(reader), getClazz());
    +        LOG.debug(output.getValue().toString());
    +        emittedObjectCount++;
    +        out.emit(output.getValue());
    +      } else {
    +        validator.validate(new StreamSource(inputTuple));
    --- End diff --
    
    Can validator be null if schema is not specified ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48103860
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    -
    +  public transient DefaultOutputPort<INPUT> validatedOutput = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    Do we need this ? We had a discussion on eng regarding whether this port would be useful and the opinion was this wont be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48221588
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.parser;
    +
    +import java.io.IOException;
    +import java.io.StringReader;
    +
    +import javax.xml.XMLConstants;
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.JAXBElement;
    +import javax.xml.bind.JAXBException;
    +import javax.xml.bind.Unmarshaller;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import javax.xml.transform.stream.StreamSource;
    +import javax.xml.validation.Schema;
    +import javax.xml.validation.SchemaFactory;
    +import javax.xml.validation.Validator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Document;
    +import org.xml.sax.SAXException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +public class XmlParser extends Parser<String>
    +{
    +  private String schemaXSDFile;
    +  private transient Unmarshaller unmarshaller;
    +  private transient Validator validator;
    +
    +  public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>();
    +
    +  @Override
    +  public Object convert(String tuple)
    +  {
    +    // This method is not invoked for XML parser
    +    return null;
    +  }
    +
    +  @Override
    +  public void processTuple(String inputTuple)
    +  {
    +    try {
    +      if (out.isConnected()) {
    +        StringReader reader = new StringReader(inputTuple);
    +        JAXBElement<?> output = unmarshaller.unmarshal(new StreamSource(reader), getClazz());
    +        LOG.debug(output.getValue().toString());
    +        emittedObjectCount++;
    +        out.emit(output.getValue());
    +      } else {
    +        validator.validate(new StreamSource(inputTuple));
    +      }
    +      if (validatedOutput.isConnected()) {
    +        validatedOutput.emit(inputTuple);
    +      }
    +      if (parsedOutput.isConnected()) {
    +        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
    +        DocumentBuilder builder;
    +        try {
    +          builder = factory.newDocumentBuilder();
    +          Document doc = builder.parse(inputTuple);
    +          parsedOutput.emit(doc);
    +
    +        } catch (Exception e) {
    +          // It should not reach here, since inputTuple string is already validaed.
    +        }
    +      }
    +    } catch (Exception e) {
    +      LOG.debug("Failed to parse xml tuple {}, Exception = {} ", inputTuple, e);
    +      errorTupleCount++;
    +      if (err.isConnected()) {
    +        err.emit(inputTuple);
    --- End diff --
    
    Can we have error port emit reason as well ? User would want to know reason for validation failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48399214
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    -
    +  public transient DefaultOutputPort<INPUT> validatedOutput = new DefaultOutputPort<INPUT>();
       public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>()
       {
         @Override
         public void process(INPUT inputTuple)
         {
    -      Object tuple = convert(inputTuple);
    -      if (tuple == null && err.isConnected()) {
    -        err.emit(inputTuple);
    -        return;
    -      }
    -      if (out.isConnected()) {
    -        out.emit(tuple);
    -      }
    +      incomingTuplesCount++;
    +      processTuple(inputTuple);
         }
       };
     
    +  public void processTuple(INPUT inputTuple)
    +  {
    +    Object tuple = convert(inputTuple);
    +    if (tuple == null && err.isConnected()) {
    +      errorTupleCount++;
    +      err.emit(inputTuple);
    +      return;
    +    }
    +    if (out.isConnected()) {
    +      emittedObjectCount++;
    +      out.emit(tuple);
    +    }
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    errorTupleCount = 0;
    +    emittedObjectCount = 0;
    +    rollingPercentValidTuples = 0;
    +    rollingPercentErrorTuples = 0;
    +  }
    +
    +  @Override
    +  public void endWindow()
    +  {
    --- End diff --
    
    Thanks for the suggestion. Did not know that we can provide dimension scheme within the operator for the metrics. Do we have any examples ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r47725295
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.parser;
    +
    +import java.io.IOException;
    +import java.io.StringReader;
    +
    +import javax.xml.XMLConstants;
    +import javax.xml.bind.JAXBContext;
    +import javax.xml.bind.JAXBElement;
    +import javax.xml.bind.JAXBException;
    +import javax.xml.bind.Unmarshaller;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import javax.xml.transform.stream.StreamSource;
    +import javax.xml.validation.Schema;
    +import javax.xml.validation.SchemaFactory;
    +import javax.xml.validation.Validator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Document;
    +import org.xml.sax.SAXException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +public class XmlParser extends Parser<String>
    +{
    +  private String schemaXSDFile;
    +  private transient Unmarshaller unmarshaller;
    +  private transient Validator validator;
    +
    +  public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>();
    +
    +  @Override
    +  public Object convert(String tuple)
    +  {
    +    // This method is not invoked for XML parser
    +    return null;
    +  }
    +
    +  @Override
    +  public void processTuple(String inputTuple)
    +  {
    +    try {
    +      if (out.isConnected()) {
    +        StringReader reader = new StringReader(inputTuple);
    +        JAXBElement<?> output = unmarshaller.unmarshal(new StreamSource(reader), getClazz());
    +        LOG.debug(output.getValue().toString());
    +        emittedObjectCount++;
    +        out.emit(output.getValue());
    +      } else {
    +        validator.validate(new StreamSource(inputTuple));
    +      }
    +      if (validatedOutput.isConnected()) {
    +        validatedOutput.emit(inputTuple);
    +      }
    +      if (parsedOutput.isConnected()) {
    +        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
    +        DocumentBuilder builder;
    +        try {
    +          builder = factory.newDocumentBuilder();
    +          Document doc = builder.parse(inputTuple);
    +          parsedOutput.emit(doc);
    +
    +        } catch (Exception e) {
    +          e.printStackTrace();
    --- End diff --
    
    swallowing exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48381816
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    The problem with that is they will have to add another port because the type here is tied to INPUT. I know its optional here but still there will be unused error ports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48398302
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -47,10 +47,19 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>,
    -    ActivationListener<Context>
    +public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>
     {
       protected transient Class<?> clazz;
    +  @AutoMetric
    +  protected long errorTupleCount;
    +  @AutoMetric
    +  protected long emittedObjectCount;
    +  @AutoMetric
    +  protected long rollingPercentValidTuples;
    +  @AutoMetric
    +  protected long rollingPercentErrorTuples;
    +  @AutoMetric
    +  protected long incomingTuplesCount;
     
       @OutputPortFieldAnnotation(schemaRequired = true)
    --- End diff --
    
    @ishark @chandnisingh Yes, you are correct. I tested it again. No need to remove the annotation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48383113
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    yes, thanks Chandni. I will make the type of error port as generics. But I will leave the behavior of emitting of error tuples by default as is for now. We can revisit it later if needed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r47725172
  
    --- Diff: library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java ---
    @@ -57,45 +55,32 @@ public XmlFormatter()
       }
     
       @Override
    -  public void activate(Context context)
    +  public void setup(OperatorContext context)
       {
    -    if (prettyPrint) {
    -      xstream = new XStream();
    -    } else {
    -      xstream = new XStream(new XppDriver()
    -      {
    -        @Override
    -        public HierarchicalStreamWriter createWriter(Writer out)
    -        {
    -          return new CompactWriter(out, getNameCoder());
    -        }
    -      });
    -    }
    -    if (alias != null) {
    -      try {
    -        xstream.alias(alias, clazz);
    -      } catch (Throwable e) {
    -        throw new RuntimeException("Unable find provided class");
    -      }
    -    }
    -    if (dateFormat != null) {
    -      xstream.registerConverter(new DateConverter(dateFormat, new String[] {}));
    +    JAXBContext ctx;
    +    try {
    +      ctx = JAXBContext.newInstance(getClazz());
    +      marshaller = ctx.createMarshaller();
    +      marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
    +      marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, prettyPrint);
    +    } catch (JAXBException e) {
    +      DTThrowable.wrapIfChecked(e);
         }
       }
     
       @Override
    -  public void deactivate()
    -  {
    -
    -  }
    -
    -  @Override
       public String convert(Object tuple)
       {
         try {
    -      return xstream.toXML(tuple);
    -    } catch (XStreamException e) {
    -      logger.debug("Error while converting tuple {} {} ",tuple,e.getMessage());
    +      StringWriter writer = new StringWriter();
    +      if (getAlias() != null) {
    +        marshaller.marshal(new JAXBElement(new QName(getAlias()), tuple.getClass(), tuple), writer);
    +      } else {
    +        marshaller.marshal(new JAXBElement(new QName(getClazz().getSimpleName()), tuple.getClass(), tuple), writer);
    +      }
    +      return writer.toString();
    +    } catch (Exception e) {
    +      logger.debug("Error while converting tuple {} {} ", tuple, e.getMessage());
           return null;
    --- End diff --
    
    Swallowing exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Added XSD validation to XML pa...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/137#discussion_r48398502
  
    --- Diff: library/src/main/java/com/datatorrent/lib/parser/Parser.java ---
    @@ -61,25 +70,48 @@ public void setup(PortContext context)
         }
       };
     
    -  @OutputPortFieldAnnotation(optional = true)
       public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
    --- End diff --
    
    As part of requirement, Csv parser operator , json and xml parser  would have to be enhanced to emit error tuples with error message . I get @chandnisingh suggestion of not keeping error port as KeyValuePair by default, but also its type shouldn't be INPUT as well which is the case now.   Better approach would be to have generic ERROR and leave the implementation to child operators


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---