You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by mattyb149 <gi...@git.apache.org> on 2017/12/15 17:52:44 UTC
[GitHub] nifi pull request #2231: NIFI-4521 MS SQL CDC Processor
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2231#discussion_r157259349
--- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mssql.processors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
+import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
+import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+@TriggerSerially
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "jdbc", "cdc", "mssql"})
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
+ + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.")
+@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so "
+ + "that it can continue from the same point in time if restarted.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."),
+ @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"),
+ @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")})
+@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
+ + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.{table_name}`, one for each table. "
+ + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.")
+public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
+ public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp.";
+
+ public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing out the records")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
+ .name("cdcmssql-dbcp-service")
+ .displayName("Database Connection Pooling Service")
+ .description("The Controller Service that is used to obtain connection to database")
+ .required(true)
+ .identifiesControllerService(DBCPService.class)
+ .build();
+
+ public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder()
+ .name("cdcmssql-cdc-table-list")
+ .displayName("CDC Table List")
+ .description("The comma delimited list of tables in the source database to monitor for changes. If no tables "
+ + "are specified the [cdc].[change_tables] table is queried for all of the available tables with change tracking enabled in the database.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder()
+ .name("cdcmssql-initial-snapshot")
+ .displayName("Generate an Initial Source Table Snapshot")
+ .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the "
+ + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point "
+ + "for extracting CDC changes.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor
+ .Builder().name("cdcmssql-full-snapshot-row-limit")
+ .displayName("Change Set Row Limit")
+ .description("If a very large change occurs on the source table, "
+ + "the generated change set may be too large too quickly merge into a destination system. "
+ + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. "
+ + "The fullsnapshot attribute will be set to true when this happens.")
+ .required(true)
+ .defaultValue("0")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Successfully created FlowFile from SQL query result set.")
+ .build();
+
+ protected List<PropertyDescriptor> descriptors;
+ protected Set<Relationship> relationships;
+
+ protected final Map<String, MSSQLTableInfo> schemaCache = new ConcurrentHashMap<String, MSSQLTableInfo>(1000);
--- End diff --
Should this be configurable due to memory concerns? If each MSSQLTableInfo is likely to be small (just a few short strings or whatever), then this number is probably fine.
---