You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/14 11:13:39 UTC
svn commit: r1183252 - in
/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel:
./ src/main/java/org/apache/flume/channel/jdbc/impl/
src/test/java/org/apache/flume/channel/jdbc/
Author: arvind
Date: Fri Oct 14 09:13:39 2011
New Revision: 1183252
URL: http://svn.apache.org/viewvc?rev=1183252&view=rev
Log:
Implemented persistence of events for derby.
Added:
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
Modified:
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/ (props changed)
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
Propchange: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct 14 09:13:39 2011
@@ -2,3 +2,4 @@
.project
.settings
target
+derby.log
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java Fri Oct 14 09:13:39 2011
@@ -17,18 +17,24 @@
*/
package org.apache.flume.channel.jdbc.impl;
+import java.io.ByteArrayInputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import javax.sql.DataSource;
import org.apache.flume.channel.jdbc.ConfigurationConstants;
import org.apache.flume.channel.jdbc.JdbcChannelException;
+import org.apache.flume.channel.jdbc.impl.PersistableEvent.HeaderEntry;
+import org.apache.flume.channel.jdbc.impl.PersistableEvent.SpillableString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -209,12 +215,35 @@ public class DerbySchemaHandler implemen
+ "FOREIGN KEY (" + COLUMN_FLV_HEADER + ") REFERENCES "
+ TABLE_FL_HEADER + " (" + COLUMN_FLH_ID + "))";
- public static final String COLUMN_LOOKUP_QUERY =
- "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+ public static final String COLUMN_LOOKUP_QUERY
+ = "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = "
+ "(SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND "
+ "SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE "
+ "SCHEMANAME = ? ))";
+ public static final String STMT_INSERT_EVENT_BASE
+ = "INSERT INTO " + TABLE_FL_EVENT + " ("
+ + COLUMN_FLE_PAYLOAD + ", " + COLUMN_FLE_CHANNEL + ", "
+ + COLUMN_FLE_SPILL + ") VALUES ( ?, ?, ?)";
+
+ public static final String STMT_INSERT_EVENT_SPILL
+ = "INSERT INTO " + TABLE_FL_PLSPILL + " ("
+ + COLUMN_FLP_EVENT + ", " + COLUMN_FLP_SPILL + ") VALUES ( ?, ?)";
+
+ public static final String STMT_INSERT_HEADER_BASE
+ = "INSERT INTO " + TABLE_FL_HEADER + " ("
+ + COLUMN_FLH_EVENT + ", " + COLUMN_FLH_NAME + ", " + COLUMN_FLH_VALUE
+ + ", " + COLUMN_FLH_NMSPILL + ", " + COLUMN_FLH_VLSPILL + ") VALUES "
+ + "( ?, ?, ?, ?, ?)";
+
+ public static final String STMT_INSERT_HEADER_NAME_SPILL
+ = "INSERT INTO " + TABLE_FL_NMSPILL + " ("
+ + COLUMN_FLN_HEADER + ", " + COLUMN_FLN_SPILL + ") VALUES ( ?, ?)";
+
+ public static final String STMT_INSERT_HEADER_VALUE_SPILL
+ = "INSERT INTO " + TABLE_FL_VLSPILL + " ("
+ + COLUMN_FLV_HEADER + ", " + COLUMN_FLV_SPILL + ") VALUES ( ?, ?)";
+
private final DataSource dataSource;
protected DerbySchemaHandler(DataSource dataSource) {
@@ -412,4 +441,201 @@ public class DerbySchemaHandler implemen
}
}
}
+
+ @Override
+ public void persistEvent(PersistableEvent pe, Connection connection) {
+ // First populate the main event table
+ byte[] basePayload = pe.getBasePayload();
+ byte[] spillPayload = pe.getSpillPayload();
+ boolean hasSpillPayload = (spillPayload != null);
+ String channelName = pe.getChannelName();
+
+ LOGGER.debug("Preparing insert event: " + pe);
+
+ PreparedStatement baseEventStmt = null;
+ PreparedStatement spillEventStmt = null;
+ PreparedStatement baseHeaderStmt = null;
+ PreparedStatement headerNameSpillStmt = null;
+ PreparedStatement headerValueSpillStmt = null;
+ try {
+ baseEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_BASE,
+ Statement.RETURN_GENERATED_KEYS);
+ baseEventStmt.setBytes(1, basePayload);
+ baseEventStmt.setString(2, channelName);
+ baseEventStmt.setBoolean(3, hasSpillPayload);
+
+ int baseEventCount = baseEventStmt.executeUpdate();
+ if (baseEventCount != 1) {
+ throw new JdbcChannelException("Invalid update count on base "
+ + "event insert: " + baseEventCount);
+ }
+ // Extract event ID and set it
+ ResultSet eventIdResult = baseEventStmt.getGeneratedKeys();
+
+ if (!eventIdResult.next()) {
+ throw new JdbcChannelException("Unable to retrieive inserted event-id");
+ }
+
+ long eventId = eventIdResult.getLong(1);
+ pe.setEventId(eventId);
+
+ // Persist the payload spill
+ if (hasSpillPayload) {
+ spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL);
+ spillEventStmt.setLong(1, eventId);
+ spillEventStmt.setBinaryStream(2,
+ new ByteArrayInputStream(spillPayload), spillPayload.length);
+ int spillEventCount = spillEventStmt.executeUpdate();
+ if (spillEventCount != 1) {
+ throw new JdbcChannelException("Invalid update count on spill "
+ + "event insert: " + spillEventCount);
+ }
+ }
+
+ // Persist the headers
+ List<HeaderEntry> headers = pe.getHeaderEntries();
+ if (headers != null && headers.size() > 0) {
+ List<HeaderEntry> headerWithNameSpill = new ArrayList<HeaderEntry>();
+ List<HeaderEntry> headerWithValueSpill = new ArrayList<HeaderEntry>();
+
+
+ baseHeaderStmt = connection.prepareStatement(STMT_INSERT_HEADER_BASE,
+ Statement.RETURN_GENERATED_KEYS);
+ Iterator<HeaderEntry> it = headers.iterator();
+ while (it.hasNext()) {
+ HeaderEntry entry = it.next();
+ SpillableString name = entry.getName();
+ SpillableString value = entry.getValue();
+ baseHeaderStmt.setLong(1, eventId);
+ baseHeaderStmt.setString(2, name.getBase());
+ baseHeaderStmt.setString(3, value.getBase());
+ baseHeaderStmt.setBoolean(4, name.hasSpill());
+ baseHeaderStmt.setBoolean(5, value.hasSpill());
+
+ int updateCount = baseHeaderStmt.executeUpdate();
+ if (updateCount != 1) {
+ throw new JdbcChannelException("Unexpected update header count: "
+ + updateCount);
+ }
+ ResultSet headerIdResultSet = baseHeaderStmt.getGeneratedKeys();
+ if (!headerIdResultSet.next()) {
+ throw new JdbcChannelException(
+ "Unable to retrieve inserted header id");
+ }
+ long headerId = headerIdResultSet.getLong(1);
+ entry.setId(headerId);
+
+ if (name.hasSpill()) {
+ headerWithNameSpill.add(entry);
+ }
+
+ if (value.hasSpill()) {
+ headerWithValueSpill.add(entry);
+ }
+ }
+
+ // Persist header name spills
+ if (headerWithNameSpill.size() > 0) {
+ LOGGER.debug("Number of headers with name spill: "
+ + headerWithNameSpill.size());
+
+ headerNameSpillStmt =
+ connection.prepareStatement(STMT_INSERT_HEADER_NAME_SPILL);
+
+ for (HeaderEntry entry : headerWithNameSpill) {
+ String nameSpill = entry.getName().getSpill();
+
+ headerNameSpillStmt.setLong(1, entry.getId());
+ headerNameSpillStmt.setString(2, nameSpill);
+ headerNameSpillStmt.addBatch();
+ }
+
+ int[] nameSpillUpdateCount = headerNameSpillStmt.executeBatch();
+ if (nameSpillUpdateCount.length != headerWithNameSpill.size()) {
+ throw new JdbcChannelException("Unexpected update count for header "
+ + "name spills: expected " + headerWithNameSpill.size() + ", "
+ + "found " + nameSpillUpdateCount.length);
+ }
+
+ for (int i = 0; i < nameSpillUpdateCount.length; i++) {
+ if (nameSpillUpdateCount[i] != 1) {
+ throw new JdbcChannelException("Unexpected update count for "
+ + "header name spill at position " + i + ", value: "
+ + nameSpillUpdateCount[i]);
+ }
+ }
+ }
+
+ // Persist header value spills
+ if (headerWithValueSpill.size() > 0) {
+ LOGGER.debug("Number of headers with value spill: "
+ + headerWithValueSpill.size());
+
+ headerValueSpillStmt =
+ connection.prepareStatement(STMT_INSERT_HEADER_VALUE_SPILL);
+
+ for(HeaderEntry entry : headerWithValueSpill) {
+ String valueSpill = entry.getValue().getSpill();
+
+ headerValueSpillStmt.setLong(1, entry.getId());
+ headerValueSpillStmt.setString(2, valueSpill);
+ headerValueSpillStmt.addBatch();
+ }
+
+ int[] valueSpillUpdateCount = headerValueSpillStmt.executeBatch();
+ if (valueSpillUpdateCount.length != headerWithValueSpill.size()) {
+ throw new JdbcChannelException("Unexpected update count for header "
+ + "value spills: expected " + headerWithValueSpill.size() + ", "
+ + "found " + valueSpillUpdateCount.length);
+ }
+
+ for (int i = 0; i < valueSpillUpdateCount.length; i++) {
+ if (valueSpillUpdateCount[i] != 1) {
+ throw new JdbcChannelException("Unexpected update count for "
+ + "header value spill at position " + i + ", value: "
+ + valueSpillUpdateCount[i]);
+ }
+ }
+ }
+ }
+ } catch (SQLException ex) {
+ throw new JdbcChannelException("Unable to persist event: " + pe, ex);
+ } finally {
+ if (baseEventStmt != null) {
+ try {
+ baseEventStmt.close();
+ } catch (SQLException ex) {
+ LOGGER.error("Unable to close base event statement", ex);
+ }
+ }
+ if (spillEventStmt != null) {
+ try {
+ spillEventStmt.close();
+ } catch (SQLException ex) {
+ LOGGER.error("Unable to close spill event statement", ex);
+ }
+ }
+ if (baseHeaderStmt != null) {
+ try {
+ baseHeaderStmt.close();
+ } catch (SQLException ex) {
+ LOGGER.error("Unable to close base header statement", ex);
+ }
+ }
+ if (headerNameSpillStmt != null) {
+ try {
+ headerNameSpillStmt.close();
+ } catch (SQLException ex) {
+ LOGGER.error("Unable to close header name spill statement", ex);
+ }
+ }
+ if (headerValueSpillStmt != null) {
+ try {
+ headerValueSpillStmt.close();
+ } catch (SQLException ex) {
+ LOGGER.error("Unable to close header value spill statement", ex);
+ }
+ }
+ }
+ }
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Fri Oct 14 09:13:39 2011
@@ -170,14 +170,15 @@ public class JdbcChannelProviderImpl imp
}
@Override
- public void persistEvent(String channelName, Event event) {
- PersistableEvent persistableEvent = new PersistableEvent(event);
- Transaction tx = null;
+ public void persistEvent(String channel, Event event) {
+ PersistableEvent persistableEvent = new PersistableEvent(channel, event);
+ JdbcTransactionImpl tx = null;
try {
tx = getTransaction();
tx.begin();
// Persist the persistableEvent
+ schemaHandler.persistEvent(persistableEvent, tx.getConnection());
tx.commit();
} catch (Exception ex) {
@@ -197,7 +198,7 @@ public class JdbcChannelProviderImpl imp
}
@Override
- public Transaction getTransaction() {
+ public JdbcTransactionImpl getTransaction() {
return txFactory.get();
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java Fri Oct 14 09:13:39 2011
@@ -154,4 +154,11 @@ public class JdbcTransactionImpl impleme
}
}
}
+
+ protected Connection getConnection() {
+ if (!active) {
+ throw new JdbcChannelException("Inactive transaction");
+ }
+ return connection;
+ }
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/MySQLSchemaHandler.java Fri Oct 14 09:13:39 2011
@@ -1,5 +1,7 @@
package org.apache.flume.channel.jdbc.impl;
+import java.sql.Connection;
+
import javax.sql.DataSource;
public class MySQLSchemaHandler implements SchemaHandler {
@@ -28,4 +30,10 @@ public class MySQLSchemaHandler implemen
}
+ @Override
+ public void persistEvent(PersistableEvent pe, Connection connection) {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java Fri Oct 14 09:13:39 2011
@@ -29,20 +29,22 @@ import org.apache.flume.channel.jdbc.Con
public class PersistableEvent {
private long eventId;
- private byte[] payload;
- private byte[] spill;
+ private final String channel;
+ private byte[] basePayload;
+ private byte[] spillPayload;
private List<HeaderEntry> headers;
- public PersistableEvent(Event event) {
+ public PersistableEvent(String channel, Event event) {
+ this.channel = channel;
byte[] givenPayload = event.getBody();
if (givenPayload.length < ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD) {
- payload = Arrays.copyOf(givenPayload, givenPayload.length);
- spill = null;
+ basePayload = Arrays.copyOf(givenPayload, givenPayload.length);
+ spillPayload = null;
} else {
- payload = Arrays.copyOfRange(givenPayload, 0,
+ basePayload = Arrays.copyOfRange(givenPayload, 0,
ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD);
- spill = Arrays.copyOfRange(givenPayload,
+ spillPayload = Arrays.copyOfRange(givenPayload,
ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD, givenPayload.length);
}
@@ -59,12 +61,13 @@ public class PersistableEvent {
public byte[] getPayload() {
byte[] result = null;
- if (spill == null) {
- result = Arrays.copyOf(payload, payload.length);
+ if (spillPayload == null) {
+ result = Arrays.copyOf(basePayload, basePayload.length);
} else {
- result = new byte[payload.length + spill.length];
- System.arraycopy(payload, 0, result, 0, payload.length);
- System.arraycopy(spill, 0, result, payload.length, spill.length);
+ result = new byte[basePayload.length + spillPayload.length];
+ System.arraycopy(basePayload, 0, result, 0, basePayload.length);
+ System.arraycopy(spillPayload, 0, result,
+ basePayload.length, spillPayload.length);
}
return result;
@@ -75,35 +78,73 @@ public class PersistableEvent {
if (headers != null) {
headerMap = new HashMap<String, String>();
for (HeaderEntry entry : headers) {
- headerMap.put(entry.getName(), entry.getValue());
+ headerMap.put(entry.getNameString(), entry.getValueString());
}
}
return headerMap;
}
- public static class HeaderEntry {
+ public String getChannelName() {
+ return channel;
+ }
+
+ public byte[] getBasePayload() {
+ return this.basePayload;
+ }
- private SpillableString nameString;
- private SpillableString valueString;
+ public byte[] getSpillPayload() {
+ return this.spillPayload;
+ }
+
+ protected void setEventId(long eventId) {
+ this.eventId = eventId;
+ }
+
+ public List<HeaderEntry> getHeaderEntries() {
+ return headers;
+ }
+
+ protected static class HeaderEntry {
+
+ private long headerId = -1L;
+ private SpillableString name;
+ private SpillableString value;
public HeaderEntry(String name, String value) {
- nameString = new SpillableString(name,
+ this.name = new SpillableString(name,
ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD);
- valueString = new SpillableString(value,
+ this.value = new SpillableString(value,
ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD);
}
- public String getName() {
- return nameString.getString();
+ public String getNameString() {
+ return name.getString();
}
- public String getValue() {
- return valueString.getString();
+ public SpillableString getName() {
+ return name;
}
+
+ public String getValueString() {
+ return value.getString();
+ }
+
+ public SpillableString getValue() {
+ return value;
+ }
+
+ protected void setId(long headerId) {
+ this.headerId = headerId;
+ }
+
+ public long getId() {
+ return headerId;
+ }
+
}
- private static class SpillableString {
+ protected static class SpillableString {
private String base;
private String spill;
@@ -144,5 +185,9 @@ public class PersistableEvent {
}
return base + spill;
}
+
+ public boolean hasSpill() {
+ return spill != null;
+ }
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java Fri Oct 14 09:13:39 2011
@@ -17,7 +17,7 @@
*/
package org.apache.flume.channel.jdbc.impl;
-import javax.sql.DataSource;
+import java.sql.Connection;
/**
* <p>A handler for creating and validating database schema for use by
@@ -42,4 +42,14 @@ public interface SchemaHandler {
* @param connection the connection to create schema for.
*/
public void createSchemaObjects();
+
+ /**
+ * Inserts the given persistent event into the database. The connection that
+ * is passed into the handler has an ongoing transaction and therefore the
+ * SchemaHandler implementation must not close the connection.
+ *
+ * @param pe the event to persist
+ * @param connection the connection to use
+ */
+ public void persistEvent(PersistableEvent pe, Connection connection);
}
Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java?rev=1183252&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java Fri Oct 14 09:13:39 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.Map;
+
+import org.apache.flume.Event;
+
+public class MockEvent implements Event {
+
+ private final byte[] payload;
+ private final Map<String, String> headers;
+
+ public MockEvent(byte[] payload, Map<String, String> headers) {
+ this.payload = payload;
+ this.headers = headers;
+ }
+
+ @Override
+ public Map<String, String> getHeaders() {
+ return headers;
+ }
+
+ @Override
+ public void setHeaders(Map<String, String> headers) {
+
+ }
+
+ @Override
+ public byte[] getBody() {
+ return payload;
+ }
+
+ @Override
+ public void setBody(byte[] body) {
+
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java?rev=1183252&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java Fri Oct 14 09:13:39 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import java.util.Random;
+
+public final class MockEventUtils {
+
+ private static final Random RANDOM = new Random(System.currentTimeMillis());
+
+ private static final String[] CHARS = new String[] {
+ "a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r",
+ "s","t","u","v","w","x","y","z",
+ "A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R",
+ "S","T","U","V","W","X","Y","Z",
+ "0","1","2","3","4","5","6","7","8","9",
+ "!","@","#","$","%","^","&","*","(",")",
+ "[","]","{","}",":",";","\"","'",",",".","<",">","?","/","\\","|",
+ };
+
+ public static byte[] generatePayload(int size) {
+ byte[] result = new byte[size];
+ RANDOM.nextBytes(result);
+ return result;
+ }
+
+ public static String generateHeaderString(int size) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ int x = Math.abs(RANDOM.nextInt());
+ int y = x % CHARS.length;
+ sb.append(CHARS[y]);
+ }
+ return sb.toString();
+ }
+
+ private MockEventUtils() {
+ // Disable explicit object creation
+ }
+
+
+}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestJdbcChannelProvider.java Fri Oct 14 09:13:39 2011
@@ -1,11 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flume.channel.jdbc;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import junit.framework.Assert;
+import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
import org.junit.After;
@@ -80,6 +100,36 @@ public class TestJdbcChannelProvider {
provider = null;
}
+ @Test
+ public void testPeristingEvents() {
+ provider = new JdbcChannelProviderImpl();
+ provider.initialize(derbyProps);
+
+ int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
+ int th = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
+
+ byte[] s1 = MockEventUtils.generatePayload(th - 1);
+ Map<String, String> m1 = new HashMap<String, String>();
+ m1.put(MockEventUtils.generateHeaderString(1), "one");
+ m1.put(MockEventUtils.generateHeaderString(2), "twotwo");
+ m1.put(MockEventUtils.generateHeaderString(3), "three");
+ m1.put(MockEventUtils.generateHeaderString(100), "ahundred");
+ m1.put(MockEventUtils.generateHeaderString(nameLimit - 21), "w");
+ m1.put(MockEventUtils.generateHeaderString(nameLimit - 2), "x");
+ m1.put(MockEventUtils.generateHeaderString(nameLimit - 1), "y");
+ m1.put(MockEventUtils.generateHeaderString(nameLimit), "z");
+ m1.put(MockEventUtils.generateHeaderString(nameLimit + 1), "a");
+ m1.put(MockEventUtils.generateHeaderString(nameLimit + 2), "b");
+ m1.put(MockEventUtils.generateHeaderString(nameLimit + 21), "c");
+
+ Event event = new MockEvent(s1, m1);
+
+ provider.persistEvent("test", event);
+
+ provider.close();
+ provider = null;
+ }
+
@After
public void tearDown() throws IOException {
if (provider != null) {
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java?rev=1183252&r1=1183251&r2=1183252&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestPersistentEvent.java Fri Oct 14 09:13:39 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flume.channel.jdbc;
import java.util.HashMap;
@@ -15,105 +32,80 @@ public class TestPersistentEvent {
private static final Logger LOGGER =
LoggerFactory.getLogger(TestPersistentEvent.class);
- private static final String[] CHARS = new String[] {
- "a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r",
- "s","t","u","v","w","x","y","z",
- "A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R",
- "S","T","U","V","W","X","Y","Z",
- "0","1","2","3","4","5","6","7","8","9",
- "!","@","#","$","%","^","&","*","(",")",
- "[","]","{","}",":",";","\"","'",",",".","<",">","?","/","\\","|",
- };
+
@Test
public void testMarshalling() {
- Random rnd = new Random(System.currentTimeMillis());
int nameLimit = ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD;
int valLimit = ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD;
- byte[] s1 = new byte[1];
- rnd.nextBytes(s1);
+ byte[] s1 = MockEventUtils.generatePayload(1);
runTest(s1, null);
- byte[] s2 = new byte[2];
- rnd.nextBytes(s2);
+ byte[] s2 = MockEventUtils.generatePayload(2);
runTest(s2, new HashMap<String, String>());
- byte[] s3 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD - 2];
- rnd.nextBytes(s3);
+ int th = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
+
+ byte[] s3 = MockEventUtils.generatePayload(th - 2);
Map<String, String> m3 = new HashMap<String, String>();
- m3.put(generateString(rnd, 1), generateString(rnd, 1));
+ m3.put(MockEventUtils.generateHeaderString(1),
+ MockEventUtils.generateHeaderString(1));
runTest(s3, m3);
- byte[] s4 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD - 1];
- rnd.nextBytes(s4);
+ byte[] s4 = MockEventUtils.generatePayload(th - 1);
Map<String, String> m4 = new HashMap<String, String>();
- m4.put(generateString(rnd, nameLimit - 21), "w");
- m4.put(generateString(rnd, nameLimit - 2), "x");
- m4.put(generateString(rnd, nameLimit - 1), "y");
- m4.put(generateString(rnd, nameLimit), "z");
- m4.put(generateString(rnd, nameLimit + 1), "a");
- m4.put(generateString(rnd, nameLimit + 2), "b");
- m4.put(generateString(rnd, nameLimit + 21), "c");
+ m4.put(MockEventUtils.generateHeaderString(nameLimit - 21), "w");
+ m4.put(MockEventUtils.generateHeaderString(nameLimit - 2), "x");
+ m4.put(MockEventUtils.generateHeaderString(nameLimit - 1), "y");
+ m4.put(MockEventUtils.generateHeaderString(nameLimit), "z");
+ m4.put(MockEventUtils.generateHeaderString(nameLimit + 1), "a");
+ m4.put(MockEventUtils.generateHeaderString(nameLimit + 2), "b");
+ m4.put(MockEventUtils.generateHeaderString(nameLimit + 21), "c");
runTest(s4, m4);
- byte[] s5 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD];
- rnd.nextBytes(s5);
+ byte[] s5 = MockEventUtils.generatePayload(th);
Map<String, String> m5 = new HashMap<String, String>();
- m5.put("w", generateString(rnd, valLimit - 21));
- m5.put("x", generateString(rnd, valLimit - 2));
- m5.put("y", generateString(rnd, valLimit - 1));
- m5.put("z", generateString(rnd, valLimit));
- m5.put("a", generateString(rnd, valLimit + 1));
- m5.put("b", generateString(rnd, valLimit + 2));
- m5.put("c", generateString(rnd, valLimit + 21));
+ m5.put("w", MockEventUtils.generateHeaderString(valLimit - 21));
+ m5.put("x", MockEventUtils.generateHeaderString(valLimit - 2));
+ m5.put("y", MockEventUtils.generateHeaderString(valLimit - 1));
+ m5.put("z", MockEventUtils.generateHeaderString(valLimit));
+ m5.put("a", MockEventUtils.generateHeaderString(valLimit + 1));
+ m5.put("b", MockEventUtils.generateHeaderString(valLimit + 2));
+ m5.put("c", MockEventUtils.generateHeaderString(valLimit + 21));
runTest(s5, m5);
- byte[] s6 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 1];
- rnd.nextBytes(s6);
+ byte[] s6 = MockEventUtils.generatePayload(th + 1);
Map<String, String> m6 = new HashMap<String, String>();
- m6.put(generateString(rnd, nameLimit - 21),
- generateString(rnd, valLimit - 21));
- m6.put(generateString(rnd, nameLimit - 2),
- generateString(rnd, valLimit - 2));
- m6.put(generateString(rnd, nameLimit - 1),
- generateString(rnd, valLimit - 1));
- m6.put(generateString(rnd, nameLimit),
- generateString(rnd, valLimit));
- m6.put(generateString(rnd, nameLimit + 1),
- generateString(rnd, valLimit + 1));
- m6.put(generateString(rnd, nameLimit + 2),
- generateString(rnd, valLimit + 2));
- m6.put(generateString(rnd, nameLimit + 21),
- generateString(rnd, valLimit + 21));
+ m6.put(MockEventUtils.generateHeaderString(nameLimit - 21),
+ MockEventUtils.generateHeaderString(valLimit - 21));
+ m6.put(MockEventUtils.generateHeaderString(nameLimit - 2),
+ MockEventUtils.generateHeaderString(valLimit - 2));
+ m6.put(MockEventUtils.generateHeaderString(nameLimit - 1),
+ MockEventUtils.generateHeaderString(valLimit - 1));
+ m6.put(MockEventUtils.generateHeaderString(nameLimit),
+ MockEventUtils.generateHeaderString(valLimit));
+ m6.put(MockEventUtils.generateHeaderString(nameLimit + 1),
+ MockEventUtils.generateHeaderString(valLimit + 1));
+ m6.put(MockEventUtils.generateHeaderString(nameLimit + 2),
+ MockEventUtils.generateHeaderString(valLimit + 2));
+ m6.put(MockEventUtils.generateHeaderString(nameLimit + 21),
+ MockEventUtils.generateHeaderString(valLimit + 21));
runTest(s6, m6);
- byte[] s7 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 2];
- rnd.nextBytes(s7);
+ byte[] s7 = MockEventUtils.generatePayload(th + 2);
runTest(s7, null);
- byte[] s8 = new byte[ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + 27];
- rnd.nextBytes(s8);
+ byte[] s8 = MockEventUtils.generatePayload(th + 27);
runTest(s8, null);
}
- private String generateString(Random rnd, int size) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- int x = Math.abs(rnd.nextInt());
- int y = x % CHARS.length;
- sb.append(CHARS[y]);
- }
- System.out.println("String: " + sb);
- return sb.toString();
- }
-
-
private void runTest(byte[] payload, Map<String, String> headers) {
- PersistableEvent pe = new PersistableEvent(new MockEvent(payload, headers));
+ PersistableEvent pe = new PersistableEvent("test",
+ new MockEvent(payload, headers));
Assert.assertArrayEquals(payload, pe.getPayload());
Map<String, String> h = pe.getHeaders();
if (h == null) {
@@ -129,38 +121,4 @@ public class TestPersistentEvent {
Assert.assertTrue(headers.size() == 0);
}
}
-
-
-
- private static class MockEvent implements Event {
-
- private final byte[] payload;
- private final Map<String, String> headers;
-
- private MockEvent(byte[] payload, Map<String, String> headers) {
- this.payload = payload;
- this.headers = headers;
- }
-
- @Override
- public Map<String, String> getHeaders() {
- return headers;
- }
-
- @Override
- public void setHeaders(Map<String, String> headers) {
-
- }
-
- @Override
- public byte[] getBody() {
- return payload;
- }
-
- @Override
- public void setBody(byte[] body) {
-
- }
-
- }
}