You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2020/10/15 07:57:37 UTC

[GitHub] [camel] aldettinger commented on a change in pull request #4447: CAMEL-15689 - Add camel-atlasmap component

aldettinger commented on a change in pull request #4447:
URL: https://github.com/apache/camel/pull/4447#discussion_r505297855



##########
File path: components/camel-atlasmap/src/main/java/org/apache/camel/component/atlasmap/AtlasMapEndpoint.java
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atlasmap;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import io.atlasmap.api.AtlasContext;
+import io.atlasmap.api.AtlasContextFactory;
+import io.atlasmap.api.AtlasException;
+import io.atlasmap.api.AtlasSession;
+import io.atlasmap.core.DefaultAtlasContextFactory;
+import io.atlasmap.v2.Audit;
+import io.atlasmap.v2.DataSource;
+import io.atlasmap.v2.DataSourceType;
+import org.apache.camel.Category;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.component.ResourceEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.atlasmap.api.AtlasContextFactory.Format.ADM;
+import static io.atlasmap.api.AtlasContextFactory.Format.JSON;
+
+/**
+ * Transforms the message using an AtlasMap transformation.
+ */
+@UriEndpoint(firstVersion = "3.7.0", scheme = "atlasmap", title = "AtlasMap", syntax = "atlasmap:resourceUri",
+             producerOnly = true, category = { Category.TRANSFORMATION })
+public class AtlasMapEndpoint extends ResourceEndpoint {
+
+    public static final String CONTENT_TYPE_JSON = "application/json";
+    public static final String CONTENT_TYPE_XML = "application/xml";
+
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasMapEndpoint.class);
+    private AtlasContextFactory atlasContextFactory;
+    private AtlasContext atlasContext;
+
+    @UriParam(label = "advanced")
+    private String propertiesFile;
+    @UriParam
+    private String sourceMapName;
+    @UriParam
+    private String targetMapName;
+    @UriParam(defaultValue = "MAP")
+    private TargetMapMode targetMapMode = TargetMapMode.MAP;
+
+    public enum TargetMapMode {
+        MAP,
+        MESSAGE_HEADER,
+        EXCHANGE_PROPERTY;
+    }
+
+    public AtlasMapEndpoint(String uri, AtlasMapComponent component, String resourceUri) {
+        super(uri, component, resourceUri);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public ExchangePattern getExchangePattern() {
+        return ExchangePattern.InOut;
+    }
+
+    @Override
+    protected String createEndpointUri() {
+        return "atlasmap:" + getResourceUri();
+    }
+
+    public AtlasContextFactory getAtlasContextFactory() {
+        return this.atlasContextFactory;
+    }
+
+    public void setAtlasContextFactory(AtlasContextFactory atlasContextFactory) {
+        this.atlasContextFactory = atlasContextFactory;
+    }
+
+    public AtlasContext getAtlasContext() {
+        return this.atlasContext;
+    }
+
+    public void setAtlasContext(AtlasContext atlasContext) {
+        this.atlasContext = atlasContext;
+    }
+
+    /**
+     * The URI of the properties file which is used for AtlasContextFactory initialization.
+     * 
+     * @param file property file path
+     */
+    public void setPropertiesFile(String file) {
+        propertiesFile = file;
+    }
+
+    public String getPropertiesFile() {
+        return propertiesFile;
+    }
+
+    /**
+     * The Exchange property name for a source message map which hold <code>java.util.Map&lt;String, Message&gt;</code>
+     * where the key is AtlasMap Document ID. AtlasMap consumes Message bodies as source documents, as well as message
+     * headers as source properties where the scope equals to Document ID.
+     * 
+     * @param name Exchange property name for source map
+     */
+    public void setSourceMapName(String name) {
+        this.sourceMapName = name;
+    }
+
+    public String getSourceMapName() {
+        return this.sourceMapName;
+    }
+
+    /**
+     * The Exchange property name for a target document map which hold <code>java.util.Map&lt;String, Object&gt;</code>
+     * where the key is AtlasMap Document ID. AtlasMap populates multiple target documents into this map.
+     * 
+     * @param name Exchange property name for target map
+     */
+    public void setTargetMapName(String name) {
+        this.targetMapName = name;
+    }
+
+    public String getTargetMapName() {
+        return this.targetMapName;
+    }
+
+    /**
+     * {@link TargetMapMode} enum value to specify how multiple target documents are delivered if exist.
+     * <ul>
+     * <li>'MAP': Stores them into a java.util.Map, and the java.util.Map is set to an exchange" property if
+     * 'targetMapName' is specified, otherwise message body.</li>"
+     * <li>'MESSAGE_HEADER': Stores them into message headers.</li>"
+     * <li>'EXCHANGE_PROPERTY': Stores them into exchange properties.</li>
+     * </ul>
+     * ")
+     * 
+     * @param mode {@link TargetMapMode}
+     */
+    public void setTargetMapMode(TargetMapMode mode) {
+        this.targetMapMode = mode;
+    }
+
+    public TargetMapMode getTargetMapMode() {
+        return this.targetMapMode;
+    }
+
+    public AtlasMapEndpoint findOrCreateEndpoint(String uri, String newResourceUri) {
+        String newUri = uri.replace(getResourceUri(), newResourceUri);
+        log.debug("Getting endpoint with URI: {}", newUri);
+        return getCamelContext().getEndpoint(newUri, AtlasMapEndpoint.class);
+    }
+
+    @Override
+    protected void onExchange(Exchange exchange) throws Exception {
+        Message incomingMessage = exchange.getIn();
+        String newResourceUri = incomingMessage.getHeader(AtlasMapConstants.ATLAS_RESOURCE_URI, String.class);
+        if (newResourceUri != null) {
+            incomingMessage.removeHeader(AtlasMapConstants.ATLAS_RESOURCE_URI);
+
+            log.debug("{} set to {} creating new endpoint to handle exchange", AtlasMapConstants.ATLAS_RESOURCE_URI,
+                    newResourceUri);
+            AtlasMapEndpoint newEndpoint = findOrCreateEndpoint(getEndpointUri(), newResourceUri);
+            newEndpoint.onExchange(exchange);
+            return;
+        }
+
+        AtlasSession atlasSession = getOrCreateAtlasContext(incomingMessage).createSession();
+        populateSourceDocuments(exchange, atlasSession);
+        atlasSession.getAtlasContext().process(atlasSession);
+
+        List<Audit> errors = new ArrayList<>();
+        for (Audit audit : atlasSession.getAudits().getAudit()) {
+            switch (audit.getStatus()) {
+                case ERROR:
+                    errors.add(audit);
+                    break;
+                case WARN:
+                    LOG.warn("{}: Document='{}(ID:{})', path='{}'",
+                            audit.getMessage(), audit.getDocName(), audit.getDocId(), audit.getPath());
+                    break;
+                default:
+                    LOG.info("{}: Document='{}(ID:{})', path='{}'",
+                            audit.getMessage(), audit.getDocName(), audit.getDocId(), audit.getPath());
+            }
+        }
+        if (!errors.isEmpty()) {
+            StringBuilder buf = new StringBuilder("Errors: ");
+            errors.stream().forEach(a -> buf.append(
+                    String.format("[%s: Document='{}(ID:{})', path='%s'], ",

Review comment:
       Do we have a mix here where last 2 params are ignored ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org