You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2020/10/14 20:26:43 UTC

[GitHub] [camel] igarashitm opened a new pull request #4447: CAMEL-15689 - Add camel-atlasmap component

igarashitm opened a new pull request #4447:
URL: https://github.com/apache/camel/pull/4447


   - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/browse/CAMEL) filed for the change (usually before you start working on it).  Trivial changes like typos do not require a JIRA issue.  Your pull request should address just this issue, without pulling in other changes.
   - [ ] Each commit in the pull request should have a meaningful subject line and body.
   - [ ] If you're unsure, you can format the pull request title like `[CAMEL-XXX] Fixes bug in camel-file component`, where you replace `CAMEL-XXX` with the appropriate JIRA issue.
   - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [ ] Run `mvn clean install -Psourcecheck` in your module with source check enabled to make sure basic checks pass and there are no checkstyle violations. A more thorough check will be performed on your pull request automatically.
   Below are the contribution guidelines:
   https://github.com/apache/camel/blob/master/CONTRIBUTING.md


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel] aldettinger commented on a change in pull request #4447: CAMEL-15689 - Add camel-atlasmap component

Posted by GitBox <gi...@apache.org>.
aldettinger commented on a change in pull request #4447:
URL: https://github.com/apache/camel/pull/4447#discussion_r505297855



##########
File path: components/camel-atlasmap/src/main/java/org/apache/camel/component/atlasmap/AtlasMapEndpoint.java
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atlasmap;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import io.atlasmap.api.AtlasContext;
+import io.atlasmap.api.AtlasContextFactory;
+import io.atlasmap.api.AtlasException;
+import io.atlasmap.api.AtlasSession;
+import io.atlasmap.core.DefaultAtlasContextFactory;
+import io.atlasmap.v2.Audit;
+import io.atlasmap.v2.DataSource;
+import io.atlasmap.v2.DataSourceType;
+import org.apache.camel.Category;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.component.ResourceEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.atlasmap.api.AtlasContextFactory.Format.ADM;
+import static io.atlasmap.api.AtlasContextFactory.Format.JSON;
+
+/**
+ * Transforms the message using an AtlasMap transformation.
+ */
+@UriEndpoint(firstVersion = "3.7.0", scheme = "atlasmap", title = "AtlasMap", syntax = "atlasmap:resourceUri",
+             producerOnly = true, category = { Category.TRANSFORMATION })
+public class AtlasMapEndpoint extends ResourceEndpoint {
+
+    public static final String CONTENT_TYPE_JSON = "application/json";
+    public static final String CONTENT_TYPE_XML = "application/xml";
+
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasMapEndpoint.class);
+    private AtlasContextFactory atlasContextFactory;
+    private AtlasContext atlasContext;
+
+    @UriParam(label = "advanced")
+    private String propertiesFile;
+    @UriParam
+    private String sourceMapName;
+    @UriParam
+    private String targetMapName;
+    @UriParam(defaultValue = "MAP")
+    private TargetMapMode targetMapMode = TargetMapMode.MAP;
+
+    public enum TargetMapMode {
+        MAP,
+        MESSAGE_HEADER,
+        EXCHANGE_PROPERTY;
+    }
+
+    public AtlasMapEndpoint(String uri, AtlasMapComponent component, String resourceUri) {
+        super(uri, component, resourceUri);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public ExchangePattern getExchangePattern() {
+        return ExchangePattern.InOut;
+    }
+
+    @Override
+    protected String createEndpointUri() {
+        return "atlasmap:" + getResourceUri();
+    }
+
+    public AtlasContextFactory getAtlasContextFactory() {
+        return this.atlasContextFactory;
+    }
+
+    public void setAtlasContextFactory(AtlasContextFactory atlasContextFactory) {
+        this.atlasContextFactory = atlasContextFactory;
+    }
+
+    public AtlasContext getAtlasContext() {
+        return this.atlasContext;
+    }
+
+    public void setAtlasContext(AtlasContext atlasContext) {
+        this.atlasContext = atlasContext;
+    }
+
+    /**
+     * The URI of the properties file which is used for AtlasContextFactory initialization.
+     * 
+     * @param file property file path
+     */
+    public void setPropertiesFile(String file) {
+        propertiesFile = file;
+    }
+
+    public String getPropertiesFile() {
+        return propertiesFile;
+    }
+
+    /**
+     * The Exchange property name for a source message map which hold <code>java.util.Map&lt;String, Message&gt;</code>
+     * where the key is AtlasMap Document ID. AtlasMap consumes Message bodies as source documents, as well as message
+     * headers as source properties where the scope equals to Document ID.
+     * 
+     * @param name Exchange property name for source map
+     */
+    public void setSourceMapName(String name) {
+        this.sourceMapName = name;
+    }
+
+    public String getSourceMapName() {
+        return this.sourceMapName;
+    }
+
+    /**
+     * The Exchange property name for a target document map which hold <code>java.util.Map&lt;String, Object&gt;</code>
+     * where the key is AtlasMap Document ID. AtlasMap populates multiple target documents into this map.
+     * 
+     * @param name Exchange property name for target map
+     */
+    public void setTargetMapName(String name) {
+        this.targetMapName = name;
+    }
+
+    public String getTargetMapName() {
+        return this.targetMapName;
+    }
+
+    /**
+     * {@link TargetMapMode} enum value to specify how multiple target documents are delivered if exist.
+     * <ul>
+     * <li>'MAP': Stores them into a java.util.Map, and the java.util.Map is set to an exchange" property if
+     * 'targetMapName' is specified, otherwise message body.</li>"
+     * <li>'MESSAGE_HEADER': Stores them into message headers.</li>"
+     * <li>'EXCHANGE_PROPERTY': Stores them into exchange properties.</li>
+     * </ul>
+     * ")
+     * 
+     * @param mode {@link TargetMapMode}
+     */
+    public void setTargetMapMode(TargetMapMode mode) {
+        this.targetMapMode = mode;
+    }
+
+    public TargetMapMode getTargetMapMode() {
+        return this.targetMapMode;
+    }
+
+    public AtlasMapEndpoint findOrCreateEndpoint(String uri, String newResourceUri) {
+        String newUri = uri.replace(getResourceUri(), newResourceUri);
+        log.debug("Getting endpoint with URI: {}", newUri);
+        return getCamelContext().getEndpoint(newUri, AtlasMapEndpoint.class);
+    }
+
+    @Override
+    protected void onExchange(Exchange exchange) throws Exception {
+        Message incomingMessage = exchange.getIn();
+        String newResourceUri = incomingMessage.getHeader(AtlasMapConstants.ATLAS_RESOURCE_URI, String.class);
+        if (newResourceUri != null) {
+            incomingMessage.removeHeader(AtlasMapConstants.ATLAS_RESOURCE_URI);
+
+            log.debug("{} set to {} creating new endpoint to handle exchange", AtlasMapConstants.ATLAS_RESOURCE_URI,
+                    newResourceUri);
+            AtlasMapEndpoint newEndpoint = findOrCreateEndpoint(getEndpointUri(), newResourceUri);
+            newEndpoint.onExchange(exchange);
+            return;
+        }
+
+        AtlasSession atlasSession = getOrCreateAtlasContext(incomingMessage).createSession();
+        populateSourceDocuments(exchange, atlasSession);
+        atlasSession.getAtlasContext().process(atlasSession);
+
+        List<Audit> errors = new ArrayList<>();
+        for (Audit audit : atlasSession.getAudits().getAudit()) {
+            switch (audit.getStatus()) {
+                case ERROR:
+                    errors.add(audit);
+                    break;
+                case WARN:
+                    LOG.warn("{}: Document='{}(ID:{})', path='{}'",
+                            audit.getMessage(), audit.getDocName(), audit.getDocId(), audit.getPath());
+                    break;
+                default:
+                    LOG.info("{}: Document='{}(ID:{})', path='{}'",
+                            audit.getMessage(), audit.getDocName(), audit.getDocId(), audit.getPath());
+            }
+        }
+        if (!errors.isEmpty()) {
+            StringBuilder buf = new StringBuilder("Errors: ");
+            errors.stream().forEach(a -> buf.append(
+                    String.format("[%s: Document='{}(ID:{})', path='%s'], ",

Review comment:
       Do we have a mix here where last 2 params are ignored ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel] igarashitm merged pull request #4447: CAMEL-15689 - Add camel-atlasmap component

Posted by GitBox <gi...@apache.org>.
igarashitm merged pull request #4447:
URL: https://github.com/apache/camel/pull/4447


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel] igarashitm commented on a change in pull request #4447: CAMEL-15689 - Add camel-atlasmap component

Posted by GitBox <gi...@apache.org>.
igarashitm commented on a change in pull request #4447:
URL: https://github.com/apache/camel/pull/4447#discussion_r505463038



##########
File path: components/camel-atlasmap/src/main/java/org/apache/camel/component/atlasmap/AtlasMapEndpoint.java
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atlasmap;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import io.atlasmap.api.AtlasContext;
+import io.atlasmap.api.AtlasContextFactory;
+import io.atlasmap.api.AtlasException;
+import io.atlasmap.api.AtlasSession;
+import io.atlasmap.core.DefaultAtlasContextFactory;
+import io.atlasmap.v2.Audit;
+import io.atlasmap.v2.DataSource;
+import io.atlasmap.v2.DataSourceType;
+import org.apache.camel.Category;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.component.ResourceEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.atlasmap.api.AtlasContextFactory.Format.ADM;
+import static io.atlasmap.api.AtlasContextFactory.Format.JSON;
+
+/**
+ * Transforms the message using an AtlasMap transformation.
+ */
+@UriEndpoint(firstVersion = "3.7.0", scheme = "atlasmap", title = "AtlasMap", syntax = "atlasmap:resourceUri",
+             producerOnly = true, category = { Category.TRANSFORMATION })
+public class AtlasMapEndpoint extends ResourceEndpoint {
+
+    public static final String CONTENT_TYPE_JSON = "application/json";
+    public static final String CONTENT_TYPE_XML = "application/xml";
+
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasMapEndpoint.class);
+    private AtlasContextFactory atlasContextFactory;
+    private AtlasContext atlasContext;
+
+    @UriParam(label = "advanced")
+    private String propertiesFile;
+    @UriParam
+    private String sourceMapName;
+    @UriParam
+    private String targetMapName;
+    @UriParam(defaultValue = "MAP")
+    private TargetMapMode targetMapMode = TargetMapMode.MAP;
+
+    public enum TargetMapMode {
+        MAP,
+        MESSAGE_HEADER,
+        EXCHANGE_PROPERTY;
+    }
+
+    public AtlasMapEndpoint(String uri, AtlasMapComponent component, String resourceUri) {
+        super(uri, component, resourceUri);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public ExchangePattern getExchangePattern() {
+        return ExchangePattern.InOut;
+    }
+
+    @Override
+    protected String createEndpointUri() {
+        return "atlasmap:" + getResourceUri();
+    }
+
+    public AtlasContextFactory getAtlasContextFactory() {
+        return this.atlasContextFactory;
+    }
+
+    public void setAtlasContextFactory(AtlasContextFactory atlasContextFactory) {
+        this.atlasContextFactory = atlasContextFactory;
+    }
+
+    public AtlasContext getAtlasContext() {
+        return this.atlasContext;
+    }
+
+    public void setAtlasContext(AtlasContext atlasContext) {
+        this.atlasContext = atlasContext;
+    }
+
+    /**
+     * The URI of the properties file which is used for AtlasContextFactory initialization.
+     * 
+     * @param file property file path
+     */
+    public void setPropertiesFile(String file) {
+        propertiesFile = file;
+    }
+
+    public String getPropertiesFile() {
+        return propertiesFile;
+    }
+
+    /**
+     * The Exchange property name for a source message map which hold <code>java.util.Map&lt;String, Message&gt;</code>
+     * where the key is AtlasMap Document ID. AtlasMap consumes Message bodies as source documents, as well as message
+     * headers as source properties where the scope equals to Document ID.
+     * 
+     * @param name Exchange property name for source map
+     */
+    public void setSourceMapName(String name) {
+        this.sourceMapName = name;
+    }
+
+    public String getSourceMapName() {
+        return this.sourceMapName;
+    }
+
+    /**
+     * The Exchange property name for a target document map which hold <code>java.util.Map&lt;String, Object&gt;</code>
+     * where the key is AtlasMap Document ID. AtlasMap populates multiple target documents into this map.
+     * 
+     * @param name Exchange property name for target map
+     */
+    public void setTargetMapName(String name) {
+        this.targetMapName = name;
+    }
+
+    public String getTargetMapName() {
+        return this.targetMapName;
+    }
+
+    /**
+     * {@link TargetMapMode} enum value to specify how multiple target documents are delivered if exist.
+     * <ul>
+     * <li>'MAP': Stores them into a java.util.Map, and the java.util.Map is set to an exchange" property if
+     * 'targetMapName' is specified, otherwise message body.</li>"
+     * <li>'MESSAGE_HEADER': Stores them into message headers.</li>"
+     * <li>'EXCHANGE_PROPERTY': Stores them into exchange properties.</li>
+     * </ul>
+     * ")
+     * 
+     * @param mode {@link TargetMapMode}
+     */
+    public void setTargetMapMode(TargetMapMode mode) {
+        this.targetMapMode = mode;
+    }
+
+    public TargetMapMode getTargetMapMode() {
+        return this.targetMapMode;
+    }
+
+    public AtlasMapEndpoint findOrCreateEndpoint(String uri, String newResourceUri) {
+        String newUri = uri.replace(getResourceUri(), newResourceUri);
+        log.debug("Getting endpoint with URI: {}", newUri);
+        return getCamelContext().getEndpoint(newUri, AtlasMapEndpoint.class);
+    }
+
+    @Override
+    protected void onExchange(Exchange exchange) throws Exception {
+        Message incomingMessage = exchange.getIn();
+        String newResourceUri = incomingMessage.getHeader(AtlasMapConstants.ATLAS_RESOURCE_URI, String.class);
+        if (newResourceUri != null) {
+            incomingMessage.removeHeader(AtlasMapConstants.ATLAS_RESOURCE_URI);
+
+            log.debug("{} set to {} creating new endpoint to handle exchange", AtlasMapConstants.ATLAS_RESOURCE_URI,
+                    newResourceUri);
+            AtlasMapEndpoint newEndpoint = findOrCreateEndpoint(getEndpointUri(), newResourceUri);
+            newEndpoint.onExchange(exchange);
+            return;
+        }
+
+        AtlasSession atlasSession = getOrCreateAtlasContext(incomingMessage).createSession();
+        populateSourceDocuments(exchange, atlasSession);
+        atlasSession.getAtlasContext().process(atlasSession);
+
+        List<Audit> errors = new ArrayList<>();
+        for (Audit audit : atlasSession.getAudits().getAudit()) {
+            switch (audit.getStatus()) {
+                case ERROR:
+                    errors.add(audit);
+                    break;
+                case WARN:
+                    LOG.warn("{}: Document='{}(ID:{})', path='{}'",
+                            audit.getMessage(), audit.getDocName(), audit.getDocId(), audit.getPath());
+                    break;
+                default:
+                    LOG.info("{}: Document='{}(ID:{})', path='{}'",
+                            audit.getMessage(), audit.getDocName(), audit.getDocId(), audit.getPath());
+            }
+        }
+        if (!errors.isEmpty()) {
+            StringBuilder buf = new StringBuilder("Errors: ");
+            errors.stream().forEach(a -> buf.append(
+                    String.format("[%s: Document='{}(ID:{})', path='%s'], ",

Review comment:
       Doh good catch, thanks! fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org