You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by mattyb149 <gi...@git.apache.org> on 2017/12/15 17:52:44 UTC

[GitHub] nifi pull request #2231: NIFI-4521 MS SQL CDC Processor

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2231#discussion_r157259349
  
    --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.cdc.mssql.processors;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.cdc.CDCException;
    +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
    +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
    +import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.StreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.ResultSetRecordSet;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.sql.Connection;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +@TriggerSerially
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"sql", "jdbc", "cdc", "mssql"})
    +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
    +        + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.")
    +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so "
    +        + "that it can continue from the same point in time if restarted.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."),
    +        @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"),
    +        @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")})
    +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
    +        + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.{table_name}`, one for each table. "
    +        + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.")
    +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
    +    public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp.";
    +
    +    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("Specifies the Controller Service to use for writing out the records")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
    +            .name("cdcmssql-dbcp-service")
    +            .displayName("Database Connection Pooling Service")
    +            .description("The Controller Service that is used to obtain connection to database")
    +            .required(true)
    +            .identifiesControllerService(DBCPService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder()
    +            .name("cdcmssql-cdc-table-list")
    +            .displayName("CDC Table List")
    +            .description("The comma delimited list of tables in the source database to monitor for changes. If no tables "
    +                    + "are specified the [cdc].[change_tables] table is queried for all of the available tables with change tracking enabled in the database.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder()
    +            .name("cdcmssql-initial-snapshot")
    +            .displayName("Generate an Initial Source Table Snapshot")
    +            .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the "
    +                + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point "
    +                + "for extracting CDC changes.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor
    +            .Builder().name("cdcmssql-full-snapshot-row-limit")
    +            .displayName("Change Set Row Limit")
    +            .description("If a very large change occurs on the source table, "
    +                    + "the generated change set may be too large too quickly merge into a destination system. "
    +                    + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. "
    +                    + "The fullsnapshot attribute will be set to true when this happens.")
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Successfully created FlowFile from SQL query result set.")
    +            .build();
    +
    +    protected List<PropertyDescriptor> descriptors;
    +    protected Set<Relationship> relationships;
    +
    +    protected final Map<String, MSSQLTableInfo> schemaCache = new ConcurrentHashMap<String, MSSQLTableInfo>(1000);
    --- End diff --
    
    Should this be configurable due to memory concerns? If each MSSQLTableInfo is likely to be small (just a few short strings or whatever), then this number is probably fine.


---