You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2019/07/22 15:16:44 UTC
[streams] branch master updated: STREAMS-647 AS 2.0 RDF
serialization in streams-processor-fullcontact
This is an automated email from the ASF dual-hosted git repository.
sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git
The following commit(s) were added to refs/heads/master by this push:
new 8328cb3 STREAMS-647 AS 2.0 RDF serialization in streams-processor-fullcontact
new 4c024f8 Merge pull request #485 from steveblackmon/STREAMS-647
8328cb3 is described below
commit 8328cb3ba5f24ea0696e782cc76b111d65da9907
Author: Steve Blackmon <sb...@apache.org>
AuthorDate: Tue Jul 16 12:00:44 2019 -0500
STREAMS-647 AS 2.0 RDF serialization in streams-processor-fullcontact
https://issues.apache.org/jira/browse/STREAMS-647
---
.../src/main/resources/fullcontact.ttl | 107 ++++
.../fullcontact/FullContactSocialGraph.scala | 164 +++++++
.../fullcontact/PersonEnrichmentProcessor.scala | 175 +++++++
.../fullcontact/util/AffinityOrdering.scala | 38 ++
.../fullcontact/util/FullContactUtils.scala | 537 +++++++++++++++++++++
5 files changed, 1021 insertions(+)
diff --git a/streams-contrib/streams-processor-fullcontact/src/main/resources/fullcontact.ttl b/streams-contrib/streams-processor-fullcontact/src/main/resources/fullcontact.ttl
new file mode 100644
index 0000000..7275e00
--- /dev/null
+++ b/streams-contrib/streams-processor-fullcontact/src/main/resources/fullcontact.ttl
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+PREFIX : <http://streams.apache.org/fullcontact#>
+PREFIX as: <http://www.w3.org/ns/activitystreams#>
+PREFIX dc: <http://purl.org/dc/elements/1.1/#>
+PREFIX dct: <http://purl.org/dc/terms/#>
+PREFIX owl: <http://www.w3.org/2002/07/owl#>
+PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
+PREFIX vcard: <http://www.w3.org/2006/vcard/ns#>
+PREFIX xs: <http://www.w3.org/2001/XMLSchema#>
+BASE <http://streams.apache.org/fullcontact#>
+
+:PersonSummary a owl:Thing
+ rdfs:comment "PersonSummary"@en ;
+ rdfs:label "PersonSummary"@en .
+
+:ageRange a owl:DatatypeProperty ;
+ rdfs:label "ageRange"@en ;
+ rdfs:comment "Age range of the contact."@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range xsd:string .
+
+:avatar a owl:DatatypeProperty ;
+ rdfs:label "avatar"@en ;
+ rdfs:comment "URL of the contact's photo."@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range xsd:string .
+
+:bio a owl:DatatypeProperty ;
+ rdfs:label "bio"@en ;
+ rdfs:comment "Biography of the contact."@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range xsd:string .
+
+:email a owl:DatatypeProperty ;
+ rdfs:label "email"@en ;
+ rdfs:comment "The email address of the contact. (Queryable)"@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range xsd:string .
+
+:organization a owl:DatatypeProperty ;
+ rdfs:label "organization"@en ;
+ rdfs:comment "Current or most recent place of work."@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range xsd:string .
+
+:title a owl:DatatypeProperty ;
+ rdfs:label "title"@en ;
+ rdfs:comment "Current or most recent job title."@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range xsd:string .
+
+:phone a owl:DatatypeProperty ;
+ rdfs:label "phone"@en ;
+ rdfs:comment "Phone number of the contact. (Queryable)"@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range xsd:string .
+
+:updated a owl:DatatypeProperty ;
+ rdfs:label "updated"@en ;
+ rdfs:comment "Date-time last updated."@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range xsd:string .
+
+:website a owl:DatatypeProperty ;
+ rdfs:label "website"@en ;
+ rdfs:comment "URL of the contact's website."@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range xsd:string .
+
+:PersonDetails a owl:Thing
+ rdfs:comment "PersonDetails"@en ;
+ rdfs:label "PersonDetails"@en .
+
+:details a owl:ObjectProperty ;
+ rdfs:label "details"@en ;
+ rdfs:comment "When included, additional details about the contact provided through Data Add-ons will be available here."@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range :PersonDetails .
+
+:website a owl:DatatypeProperty ;
+ rdfs:label "website"@en ;
+ rdfs:comment "URL of the contact's website."@en ;
+ rdfs:domain :PersonSummary ;
+ rdfs:range xsd:string .
+
+:gender a owl:DatatypeProperty ;
+ rdfs:label "gender"@en ;
+ rdfs:comment "Gender of the contact."@en ;
+ rdfs:domain :PersonDetails ;
+ rdfs:range xsd:string .
+
diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/FullContactSocialGraph.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/FullContactSocialGraph.scala
new file mode 100644
index 0000000..929ff09
--- /dev/null
+++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/FullContactSocialGraph.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.fullcontact
+
+import java.io.InputStream
+import java.io.OutputStream
+import java.io.{BufferedInputStream, File, FileInputStream, FileOutputStream, PrintStream}
+import java.util.Scanner
+import java.util.concurrent.Callable
+
+import com.google.common.base.Preconditions
+import com.typesafe.config.Config
+import org.apache.juneau.json.JsonParser
+import org.apache.streams.config.StreamsConfigurator
+import org.apache.streams.fullcontact.FullContactSocialGraph.FullContactSocialGraphStats
+import org.apache.streams.fullcontact.FullContactSocialGraph.typesafe
+import org.apache.streams.fullcontact.pojo.PersonSummary
+import org.apache.streams.fullcontact.util.FullContactUtils
+
+import scala.io.Source
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+
+/**
+ * Produce an activity streams 2.0 social graph from full contact response data file.
+ */
+object FullContactSocialGraph {
+
+ lazy val typesafe: Config = StreamsConfigurator.getConfig.getConfig("org.apache.streams.fullcontact.FullContactSocialGraph")
+
+ /**
+ * To use from command line:
+ *
+ * <p/>
+ * java -cp streams-dist-jar-with-dependencies.jar -Dconfig.file=application.conf org.apache.streams.fullcontact.SocialGraphCli
+ *
+ * <p/>
+ * Input stream should contain a series of json-serialized `PersonSummary` objects.
+ *
+ * <p/>
+ * Output stream will contain a TTL-serialized social graph.
+ *
+ * <p/>
+ * Input to the process is:
+ * A file if application.conf contains an 'input' key
+ * A file if -Dinput= is specified
+ * stdin otherwise
+ *
+ * Output from the process is:
+ * A file if application.conf contains an 'input' key
+ * A file if -Doutput= is specified
+ * stdout otherwise
+ *
+ * @link org.apache.streams.fullcontact.FullContactSocialGraph
+ * @throws Exception Exception
+ */
+ @throws[Exception]
+ final def main(args: Array[String]): Unit = {
+
+ val inputStream: InputStream = if (typesafe.hasPath("input")) {
+ new FileInputStream(new File(typesafe.getString("input")))
+ } else System.in
+
+ val outputStream: OutputStream = if (typesafe.hasPath("output")) {
+ new FileOutputStream(new File(typesafe.getString("output")))
+ } else System.out
+
+ val job = Try(new FullContactSocialGraph(inputStream, outputStream))
+
+ job match {
+ case Success(_) => return job.get
+ case Failure(t : Throwable) => throw new Exception(t)
+ }
+ }
+
+ case class FullContactSocialGraphStats(
+ inputLines : Int,
+ personSummaries : Int,
+ allOrganizations : Int,
+ allInterestItems : Int,
+ uniqueInterests : Int,
+ topicHierarchy : Int,
+ allProfiles : Int,
+ allProfileRelationships : Int,
+ allEmploymentItems : Int,
+ uniqueEmployers : Int,
+ allUrlRelationships : Int,
+ allImageRelationships : Int
+ )
+}
+
+class FullContactSocialGraph(in: InputStream, out: OutputStream ) extends Callable[FullContactSocialGraphStats] {
+
+ val inputStream: BufferedInputStream = new BufferedInputStream(in)
+ val outputStream: PrintStream = new PrintStream(out)
+
+ override def call() : FullContactSocialGraphStats = {
+
+ val input = Source.fromInputStream (inputStream)
+ val inputLines = input.getLines()
+
+ // sequence of all PersonSummary
+ val personSummaries = inputLines.map (JsonParser.DEFAULT.parse (_, classOf[PersonSummary] ) ).toSeq
+
+ // PersonSummary derived sequences
+ val allOrganizations = FullContactUtils.allOrganizationItems (personSummaries.toIterator).toSeq
+ val allInterestItems = FullContactUtils.allInterestItems (personSummaries.toIterator).toSeq
+ val uniqueInterests = FullContactUtils.uniqueInterests (allInterestItems.toIterator).toSeq
+ val topicHierarchy = FullContactUtils.topicHierarchy (allInterestItems.toIterator).toSeq
+
+ val allProfiles = FullContactUtils.allProfiles (personSummaries.toIterator).seq
+ val allProfileRelationships = FullContactUtils.allProfileRelationships (personSummaries.toIterator).seq
+
+ val allEmploymentItems = FullContactUtils.allEmploymentItems (personSummaries.toIterator).seq
+ val uniqueEmployers = FullContactUtils.uniqueEmployers (allEmploymentItems).seq
+
+ val allUrlRelationships = FullContactUtils.allUrlRelationships (personSummaries.toIterator).seq
+ val allImageRelationships = FullContactUtils.allImageRelationships (personSummaries.toIterator).seq
+
+ personSummaries.flatMap (FullContactUtils.safe_personSummaryAsTurtle).foreach (outputStream.println (_) )
+ allOrganizations.flatMap (FullContactUtils.safe_organizationAsTurtle).foreach (outputStream.println (_) )
+
+ uniqueInterests.flatMap (FullContactUtils.safe_interestTopicAsTurtle).foreach (outputStream.println (_) )
+ topicHierarchy.map (FullContactUtils.topicRelationshipAsTurtle).foreach (outputStream.println (_) )
+
+ allUrlRelationships.flatMap (FullContactUtils.safe_urlRelationshipAsTurtle).foreach (outputStream.println (_) )
+ allImageRelationships.flatMap (FullContactUtils.safe_imageRelationshipAsTurtle).foreach (outputStream.println (_) )
+
+ allProfiles.flatMap (FullContactUtils.safe_profileAsTurtle).foreach (outputStream.println (_) )
+ allProfileRelationships.flatMap (FullContactUtils.safe_personProfileRelationshipAsTurtle).foreach (outputStream.println (_) )
+
+ FullContactSocialGraphStats(
+ inputLines = inputLines.size,
+ personSummaries = personSummaries.size,
+ allOrganizations = allOrganizations.size,
+ allInterestItems = allInterestItems.size,
+ uniqueInterests = uniqueInterests.size,
+ topicHierarchy = topicHierarchy.size,
+ allProfiles = allProfiles.size,
+ allProfileRelationships = allProfileRelationships.size,
+ allEmploymentItems = allEmploymentItems.size,
+ uniqueEmployers = uniqueEmployers.size,
+ allUrlRelationships = allUrlRelationships.size,
+ allImageRelationships = allImageRelationships.size
+ )
+ }
+}
diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/PersonEnrichmentProcessor.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/PersonEnrichmentProcessor.scala
new file mode 100644
index 0000000..15d06c9
--- /dev/null
+++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/PersonEnrichmentProcessor.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.fullcontact
+
+import java.io.BufferedInputStream
+import java.io.BufferedOutputStream
+import java.io.File
+import java.io.FileInputStream
+import java.io.FileOutputStream
+import java.io.PrintStream
+import java.util.Scanner
+
+import com.typesafe.config.Config
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.core.StreamsProcessor
+import org.apache.streams.fullcontact.api.EnrichPersonRequest
+import org.apache.streams.fullcontact.config.FullContactConfiguration
+import org.apache.streams.fullcontact.pojo.PersonSummary
+
+import scala.collection.JavaConversions._
+import scala.io.Source
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+
+/**
+ * Call enrich persons on a series of requests
+ */
+object PersonEnrichmentProcessor {
+
+ lazy val typesafe: Config = StreamsConfigurator.getConfig.getConfig("org.apache.streams.fullcontact.PersonEnrichmentProcessor")
+
+ /**
+ * To use from command line:
+ *
+ * <p/>
+ * Supply (at least) the following required configuration in application.conf:
+ *
+ * <p/>
+ * org.apache.streams.fullcontact.config.FullContactConfiguration.token = ""
+ *
+ * <p/>
+ * Launch syntax:
+ *
+ * <p/>
+ * java -cp streams-dist-jar-with-dependencies.jar -Dconfig.file=./application.conf org.apache.streams.fullcontact.provider.PersonEnrichmentProcessor
+ *
+ * <p/>
+ * Input to the process is:
+ * A file if application.conf contains an 'input' key
+ * A file if -Dinput= is specified
+ * stdin otherwise
+ *
+ * Output from the process is:
+ * A file if application.conf contains an 'input' key
+ * A file if -Doutput= is specified
+ * stdout otherwise
+ *
+ * @link org.apache.streams.fullcontact.api.EnrichPersonRequest
+ * @param args application.conf input.jsonl output.jsonl
+ * @throws Exception Exception
+ */
+ @throws[Exception]
+ final def main(args: Array[String]): Unit = {
+
+ val inputStream = if (typesafe.hasPath("input")) {
+ new BufferedInputStream(new FileInputStream(new File(typesafe.getString("input"))))
+ } else System.in
+
+ val outputStream = if (typesafe.hasPath("output")) {
+ new PrintStream(new FileOutputStream(new File(typesafe.getString("output"))))
+ } else System.out
+
+ val input = Source.fromInputStream(inputStream)
+ val inputLines = input.getLines()
+ val inputDatums = inputLines.map(entry => new StreamsDatum(entry))
+
+ val outStream = new PrintStream(new BufferedOutputStream(outputStream))
+
+ val outputDatums = streamDatums(inputDatums)
+ val outputLines = outputDatums.map(_.getDocument().asInstanceOf[String])
+
+ for( line <- outputLines ) {
+ outStream.println(line)
+ }
+ outStream.flush()
+ outStream.close()
+ }
+
+ def stream( iter : Iterator[EnrichPersonRequest] )
+ ( implicit processor : PersonEnrichment = FullContact.getInstance() ) : Iterator[PersonSummary] = {
+ iter.map( item => processor.enrichPerson(item) )
+ }
+
+ def streamDatums( iter : Iterator[StreamsDatum] )
+ ( implicit processor : PersonEnrichmentProcessor = new PersonEnrichmentProcessor() ) : Iterator[StreamsDatum] = {
+ iter.flatMap( item => processor.process(item) )
+ }
+
+ def processor : Iterator[EnrichPersonRequest] => Iterator[PersonSummary] = {
+ stream(_)
+ }
+
+}
+
+class PersonEnrichmentProcessor(config : FullContactConfiguration = new ComponentConfigurator[FullContactConfiguration](classOf[FullContactConfiguration]).detectConfiguration())
+ extends StreamsProcessor with Serializable {
+
+ var personEnrichment = FullContact.getInstance(config)
+
+ /**
+ * Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and return the the StreamsDatums that will
+ * passed to every down stream operation that reads from this processor.
+ *
+ * @param entry StreamsDatum to be processed
+ * @return resulting StreamDatums from processing. Should never be null or contain null object. Empty list OK.
+ */
+ override def process(entry: StreamsDatum): java.util.List[StreamsDatum] = {
+ val request : EnrichPersonRequest = {
+ entry.getDocument match {
+ case _ : EnrichPersonRequest => entry.getDocument.asInstanceOf[EnrichPersonRequest]
+ case _ : String => personEnrichment.parser.parse(entry.getDocument, classOf[EnrichPersonRequest])
+ case _ => throw new Exception("invalid input type")
+ }
+ }
+ val attempt = Try(personEnrichment.enrichPerson(request))
+ attempt match {
+ case Success(_ : PersonSummary) => List(new StreamsDatum(personEnrichment.serializer.serialize(attempt.get)))
+ case Failure(_) => List()
+ }
+ }
+
+ /**
+ * Each operation must publish an identifier.
+ */
+ override def getId: String = "PersonEnrichmentProcessor"
+
+ /**
+ * This method will be called after initialization/serialization. Initialize any non-serializable objects here.
+ *
+ * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type
+ * will be based on where the operation is being run (ie. hadoop, storm, locally, etc.)
+ */
+ override def prepare(configurationObject: Any): Unit = {
+ personEnrichment = FullContact.getInstance(configurationObject.asInstanceOf[FullContactConfiguration])
+ }
+
+ /**
+ * No guarantee that this method will ever be called. But upon shutdown of the stream, an attempt to call this method
+ * will be made.
+ * Use this method to terminate connections, etc.
+ */
+ override def cleanUp(): Unit = {
+ personEnrichment.restClient.close()
+ personEnrichment = null
+ }
+}
diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/AffinityOrdering.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/AffinityOrdering.scala
new file mode 100644
index 0000000..dabadc1
--- /dev/null
+++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/AffinityOrdering.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.fullcontact.util
+
+import org.apache.juneau.ObjectMap
+
+object AffinityOrdering extends Ordering[ObjectMap] {
+ def sortByAffinityString(s1: String, s2: String) : Int = {
+ if(s1 == null && s2 == null) return 0;
+ if(s1 == null ) return -1;
+ if(s2 == null ) return 1;
+ if(s1.isEmpty && s2.isEmpty) return 0;
+ if(s1 == s2) return 0;
+ if(s1.isEmpty ) return -1;
+ if(s2.isEmpty ) return 1;
+ return s1.takeRight(1).compareToIgnoreCase(s2.takeRight(1))
+ }
+ def sortByAffinity(o1: ObjectMap, o2: ObjectMap) : Int = {
+ return sortByAffinityString(o1.getString("affinity", ""), o2.getString("affinity", ""))
+ }
+ def compare(a:ObjectMap, b:ObjectMap) = sortByAffinity(a,b)
+}
diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/FullContactUtils.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/FullContactUtils.scala
new file mode 100644
index 0000000..e6db251
--- /dev/null
+++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/FullContactUtils.scala
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.fullcontact.util
+
+import org.apache.juneau.ObjectList
+import org.apache.juneau.ObjectMap
+import org.apache.juneau.json.JsonParser
+import org.apache.streams.config.ComponentConfigurator
+
+import scala.collection.mutable.ListBuffer
+
+case class Employer(name : String, domain : String)
+
+case class EmployerRelationship(personId : String, employerId : String)
+
+case class InterestTopic(id : String, name : String, category : String)
+
+case class TopicRelationship(child : String, parent : String)
+
+case class Organization(name : String, domain : String)
+
+case class OrganizationRelationship(personId : String, orgId : String)
+
+case class PersonProfileRelationship(personId : String, profileUri : String)
+
+case class ImageRelationship( entityUri : String, url : String, label : String)
+
+case class UrlRelationship( entityUri : String, url : String, label : String)
+
+object FullContactUtils {
+
+ import org.apache.streams.fullcontact.api._
+ import org.apache.streams.fullcontact.config._
+ import org.apache.streams.fullcontact.pojo._
+
+ import scala.collection.JavaConversions._
+ import scala.util.Try
+
+ final implicit val fullContactConfiguration = new ComponentConfigurator(classOf[FullContactConfiguration]).detectConfiguration()
+
+ val apst_angellist_ns = "http://streams.apache.org/streams-contrib/streams-provider-angellist#"
+ val apst_facebook_ns = "http://streams.apache.org/streams-contrib/streams-provider-facebook#"
+ val apst_foursquare_ns = "http://streams.apache.org/streams-contrib/streams-provider-foursquare#"
+ val apst_googleplus_ns = "http://streams.apache.org/streams-contrib/streams-provider-googleplus#"
+ val apst_instagram_ns = "http://streams.apache.org/streams-contrib/streams-provider-instagram#"
+ val apst_linkedin_ns = "http://streams.apache.org/streams-contrib/streams-provider-linkedin#"
+ val apst_twitter_ns = "http://streams.apache.org/streams-contrib/streams-provider-twitter#"
+
+ val fc_ns = "http://api.fullcontact.com/"
+ val fc_image_ns = "http://api.fullcontact.com/image/"
+ val fc_org_ns = "http://api.fullcontact.com/organization/"
+ val fc_person_ns = "http://api.fullcontact.com/person/"
+ val fc_profile_ns = "http://api.fullcontact.com/profile/"
+ val fc_topic_ns = "http://api.fullcontact.com/topic/"
+ val fc_url_ns = "http://api.fullcontact.com/url/"
+
+ val fc_prefix = "fc"
+ val fc_image_prefix = "fc_img"
+ val fc_org_prefix = "fc_org"
+ val fc_person_prefix = "fc_person"
+ val fc_profile_prefix = "fc_profile"
+ val fc_topic_prefix = "fc_topic"
+ val fc_url_prefix = "fc_url"
+
+ def imgId( url: String ) : String = url.hashCode.toString
+
+ def urlId( url: String ) : String = url.hashCode.toString
+
+ def companyId( company: CompanySummary ) : String = {
+ company.getName.replaceAll("\\p{Punct}","-")
+ }
+
+ def orgId( organization: Organization ) : String = {
+ organization.domain.replaceAll("\\p{Punct}", "-")
+ }
+
+ def orgLabel( input: String ) : String = {
+ input.replaceAll("\\p{Punct}", " ")
+ }
+
+ def orgLabel( organization: Organization ) : String = {
+ orgLabel(organization.name)
+ }
+
+ def personId( input : PersonSummary ) : String = {
+ input.getFullName.replaceAll("\\W","")
+ }
+
+ def personSafeName( input : PersonSummary ) : String = {
+ input.getFullName.replaceAll("\\W","")
+ }
+
+ def profileUsername( input : PersonProfile ) : String = {
+ input.getUsername.replaceAll("\\W","")
+ }
+
+ def profileNamespaceAndId( profile : PersonProfile ) : (String,String) = {
+ profile.getService match {
+ case "angellist" => (apst_angellist_ns,profileId(profile))
+ case "twitter" => (apst_twitter_ns,profileId(profile))
+ case "facebook" | "facebookpage" => (apst_facebook_ns,profileId(profile))
+ case "foursquare" => (apst_foursquare_ns,profileId(profile))
+ case "linkedin" => (apst_linkedin_ns,profileId(profile))
+ case "instagram" => (apst_instagram_ns,profileUsername(profile))
+ case "googleplus" => (apst_googleplus_ns,profileId(profile))
+ }
+ }
+
+ def uriFromNamespaceAndId( ns_id : (String, String) ) = s"${ns_id._1}${ns_id._2}"
+
+ def profilePrefixAndType( profile : PersonProfile ) : (String,String) = {
+ profile.getService match {
+ case "angellist" => ("fc","Profile")
+ case "twitter" => ("apst","TwitterProfile")
+ case "facebook" | "facebookpage" => ("apst","FacebookProfile")
+ case "foursquare" => ("apst","FoursquareProfile")
+ case "linkedin" => ("apst","LinkedinProfile")
+ case "instagram" => ("apst","InstagramProfile")
+ case "googleplus" => ("apst","GooglePlusProfile")
+ }
+ }
+
+ def profileId( profile : PersonProfile ) : String = {
+ val idOrUsername = {
+ if( profile.getUserid.nonEmpty ) profile.getUserid
+ else if ( profile.getUsername.nonEmpty ) profile.getUsername
+ else profile.getUrl.split("/").last
+ }
+ s"${profile.getService}/${idOrUsername}"
+ }
+
+ def topicId( topic : InterestTopic ) : String = {
+ topic.id
+ }
+
+ def selectProfiles( profiles : PersonProfiles) : Seq[PersonProfile] = {
+ val list = new ListBuffer[PersonProfile]
+ list += profiles.getAngellist
+ list += profiles.getFacebook
+ list += profiles.getFacebookpage
+ list += profiles.getFoursquare
+ list += profiles.getGoogleplus
+ list += profiles.getInstagram
+ list += profiles.getLinkedin
+ list += profiles.getTwitter
+ list
+ }
+
+ def personProfileRelationships( summary : PersonSummary ) : Seq[PersonProfileRelationship] = {
+ val list = new ListBuffer[PersonProfileRelationship]
+ for( profile <- selectProfiles(summary.getDetails.getProfiles())) {
+ val person_id = Try(personId(summary))
+ val profile_uri = Try(uriFromNamespaceAndId(profileNamespaceAndId(profile)))
+ if( person_id.isSuccess &&
+ person_id.get != null &&
+ !person_id.get.isEmpty &&
+ profile_uri.isSuccess &&
+ profile_uri.get != null &&
+ !profile_uri.get.isEmpty ) {
+ val rel = PersonProfileRelationship(person_id.get, profile_uri.get)
+ list += rel
+ }
+ }
+ list
+ }
+
+ def allInterestItems( input : Iterator[PersonSummary] ) : Iterator[PersonInterestItem] = {
+ input.flatMap(person => person.getDetails.getInterests.toSeq)
+ }
+
+ def uniqueInterests( input : Iterator[PersonInterestItem] ) : Iterator[InterestTopic] = {
+ input.map(interestItem => InterestTopic(interestItem.getId, interestItem.getName, interestItem.getCategory)).toSet.toIterator
+ }
+
+ def topicHierarchy( input : Iterator[PersonInterestItem] ) : Iterator[TopicRelationship] = {
+ input.flatMap(item => item.getParentIds.map(parentId => TopicRelationship(item.getId, parentId))).toSet.toIterator
+ }
+
+ def allImageRelationships( input : Iterator[PersonSummary] ) : Iterator[ImageRelationship] = {
+ input.flatMap(item => item.getDetails.getPhotos.flatMap(
+ photo => Try(ImageRelationship(uriFromNamespaceAndId(fc_person_ns,personId(item)), photo.getValue, photo.getLabel)).toOption
+ ))
+ }
+
+ def allUrlRelationships( input : Iterator[PersonSummary] ) : Iterator[UrlRelationship] = {
+ input.flatMap(item => item.getDetails.getUrls.flatMap(
+ url => Try(UrlRelationship(uriFromNamespaceAndId(fc_person_ns,personId(item)), url.getValue, url.getLabel)).toOption
+ ))
+ }
+
+ def allOrganizationItems( input : Iterator[PersonSummary] ) : Iterator[Organization] = {
+ input.flatMap(person => person.getDetails.getEmployment.map(employment => Organization(employment.getName, employment.getDomain))).toSet.toIterator
+ }
+
+ def allProfiles( input : Iterator[PersonSummary] ) : Iterator[PersonProfile] = {
+ input.flatMap(summary => selectProfiles(summary.getDetails.getProfiles))
+ }
+
+ def allProfileRelationships( input : Iterator[PersonSummary] ) : Iterator[PersonProfileRelationship] = {
+ input.flatMap(personProfileRelationships)
+ }
+
+ def allEmploymentItems( input : Iterator[PersonSummary] ) : Iterator[PersonEmploymentItem] = {
+ input.flatMap(person => person.getDetails.getEmployment.toSeq)
+ }
+
+ def uniqueEmployers( input : Iterator[PersonEmploymentItem] ) : Iterator[Employer] = {
+ input.map(employmentItem => Employer(employmentItem.getName, employmentItem.getDomain)).toSet.toIterator
+ }
+
+ def personSummaryAsTurtle(root: PersonSummary): String = {
+ val id = personId(root)
+ val sb = new StringBuilder()
+ sb.append(s"""|${fc_person_prefix}:${id}
+ | a ${fc_prefix}:Person ;
+ | as:displayName "${root.getFullName}" ;
+ | as:url "${root.getWebsite}" ;
+ | dct:modified "${root.getUpdated}" ;
+ | vcard:fn "${root.getFullName}" ;
+ | vcard:org "${orgLabel(root.getOrganization)}" ;
+ | vcard:title "${root.getTitle}" ;
+ | .
+ |
+ |""".stripMargin)
+ if( !root.getGender.isEmpty)
+ sb.append(s"""${fc_person_prefix}:${id} vcard:gender "${root.getGender}" . """).append("\n")
+ if( root.getDetails.getEmails != null && !root.getDetails.getEmails.isEmpty)
+ root.getDetails.getEmails.foreach (
+ email => {
+ sb.append(s"""${fc_person_prefix}:${id} vcard:email "mailto:${email.getValue}" . """).append("\n")
+ }
+ )
+ if( root.getDetails.getPhones != null && !root.getDetails.getPhones.isEmpty)
+ root.getDetails.getPhones.foreach (
+ tel => {
+ sb.append(s"""${fc_person_prefix}:${id} vcard:tel "tel:${tel.getValue}" . """).append("\n")
+ }
+ )
+ if( root.getDetails.getUrls != null && !root.getDetails.getUrls.isEmpty)
+ root.getDetails.getUrls.foreach (
+ url => {
+ sb.append(s"""${fc_person_prefix}:${id} vcard:url "${url.getValue}" . """).append("\n")
+ }
+ )
+ sb.toString
+ }
+
+ def safe_personSummaryAsTurtle(root: PersonSummary): Option[String] = {
+ Try(personSummaryAsTurtle(root)).toOption
+ }
+
+ def urlRelationshipAsTurtle(item: UrlRelationship) : String = {
+ val uri = fc_url_ns + urlId(item.url)
+ val sb = new StringBuilder()
+ sb.append(s"""|<${uri}>
+ | a as:Link ;
+ | as:href "${item.url}" ;
+ | as:rel "${item.label}" .
+ |
+ |<${item.entityUri}> as:link <${uri}> .
+ |
+ |""".stripMargin)
+ sb.toString
+ }
+
+ def safe_urlRelationshipAsTurtle(item: UrlRelationship) : Option[String] = {
+ Try(urlRelationshipAsTurtle(item)).toOption
+ }
+
+ def imageRelationshipAsTurtle(item: ImageRelationship) : String = {
+ val uri = fc_url_ns + urlId(item.url)
+ val sb = new StringBuilder()
+ sb.append(s"""|<${uri}>
+ | a as:Image ;
+ | as:href "${item.url}" ;
+ | as:rel "${item.label}" .
+ |
+ |<${item.entityUri}> as:image <${uri}> .
+ |
+ |""".stripMargin)
+ sb.toString
+ }
+
+ def safe_imageRelationshipAsTurtle(item: ImageRelationship) : Option[String] = {
+ Try(imageRelationshipAsTurtle(item)).toOption
+ }
+
+ def interestTopicAsTurtle(topic: InterestTopic): String = {
+ val id = topic.id
+ val name = topic.name.replaceAll("\\p{Punct}"," ");
+ val sb = new StringBuilder()
+ sb.append(s"""|${fc_topic_prefix}:${id} a skos:Concept .
+ |${fc_topic_prefix}:${id} skos:prefLabel "${name}" .
+ |
+ |""".stripMargin)
+ sb.toString
+ }
+
+ def safe_interestTopicAsTurtle(topic: InterestTopic): Option[String] = {
+ Try(interestTopicAsTurtle(topic)).toOption
+ }
+
+ def topicRelationshipAsTurtle(relationship: TopicRelationship): String = {
+ s"${fc_topic_prefix}:${relationship.child} skos:broader ${fc_topic_prefix}:${relationship.parent} ."
+ }
+
+ def personInterestsAsTurtle(root: PersonSummary): String = {
+ val id = personId(root)
+ val sb = new StringBuilder()
+ if( root.getDetails.getInterests != null && !root.getDetails.getInterests.isEmpty)
+ root.getDetails.getInterests.foreach (
+ interest => {
+ sb.append(s"""${fc_person_prefix}:${id} foaf:interest ${fc_topic_prefix}:${interest.getId} . """).append("\n")
+ }
+ )
+ sb.toString
+ }
+
+ def safe_personInterestsAsTurtle(root: PersonSummary): Option[String] = {
+ Try(personInterestsAsTurtle(root)).toOption
+ }
+
+ def organizationAsTurtle(organization: Organization): String = {
+ val ns = fc_org_ns
+ val id = orgId(organization)
+ val sb = new StringBuilder()
+ sb.append(s"""|${fc_org_prefix}:${id}
+ | a ${fc_prefix}:Organization ;
+ | as:displayName "${orgLabel(organization)}" ;
+ | as:url "${organization.domain}" ;
+ | .
+ |
+ |""".stripMargin)
+ sb.toString
+ }
+
+ def safe_organizationAsTurtle(organization: Organization): Option[String] = {
+ Try(organizationAsTurtle(organization)).toOption
+ }
+
+ def personProfileRelationshipAsTurtle(relationship: PersonProfileRelationship): String = {
+ s"<${relationship.profileUri}> as:describes ${fc_person_prefix}:${relationship.personId} ."
+ }
+
+ def safe_personProfileRelationshipAsTurtle(relationship: PersonProfileRelationship): Option[String] = {
+ import scala.util.Try
+ Try(personProfileRelationshipAsTurtle(relationship)).toOption
+ }
+
+ def profileAsTurtle(profile: PersonProfile): String = {
+ val id = profileId(profile)
+ val uri = uriFromNamespaceAndId(profileNamespaceAndId(profile))
+ val prefix_type = profilePrefixAndType(profile)
+ val bio = profile.getBio.replaceAll("\\p{Punct}"," ");
+ val sb = new StringBuilder()
+ sb.append(s"""|<$uri>
+ | a ${prefix_type._1}:${prefix_type._2} ;
+ | as:id "${id}" ;
+ | as:name "${profile.getUsername}" ;
+ | as:displayName "${profile.getUsername}" ;
+ | as:summary "${bio}" ;
+ | as:url "${profile.getUrl}" ;
+ | as:provider "${profile.getService}" ;
+ |""".stripMargin)
+ if( profile.getFollowers != null)
+ sb.append(s""" apst:followers "${profile.getFollowers}"^^xs:integer ;""").append("\n")
+ if( profile.getFollowing != null)
+ sb.append(s""" apst:following "${profile.getFollowing}"^^xs:integer ;""").append("\n")
+ sb.append("").append("\n")
+ sb.toString
+ }
+
+ def safe_profileAsTurtle(profile: PersonProfile): Option[String] = {
+ Try(profileAsTurtle(profile)).toOption
+ }
+
+
+ def companySummaryAsTurtle(root: CompanySummary): String = {
+ import scala.collection.JavaConversions._
+ val ns = fc_org_ns
+ val id = companyId(root)
+ val bio = root.getBio.replaceAll("\\p{Punct}"," ");
+ val sb = new StringBuilder()
+ sb.append(s"""|${fc_org_prefix}:${id}
+ | a ${fc_prefix}:Organization ;
+ | as:displayName "${root.getName}" ;
+ | as:url "${root.getWebsite}" ;
+ | as:summary "${bio}" ;
+ | ${fc_prefix}:employees "${root.getEmployees}"^^xs:integer ;
+ | ${fc_prefix}:founded "${root.getFounded}"^^xs:integer ;
+ | vcard:category "${root.getCategory}" ;
+ | .
+ |""".stripMargin)
+ if( root.getDetails.getEmails != null && !root.getDetails.getEmails.isEmpty)
+ root.getDetails.getEmails.foreach (
+ email => {
+ sb.append(s"""${fc_org_prefix}:${id} vcard:email "mailto:${email.getValue}" . """).append("\n")
+ }
+ )
+ if( root.getDetails.getPhones != null && !root.getDetails.getPhones.isEmpty)
+ root.getDetails.getPhones.foreach (
+ phone => {
+ sb.append(s"""${fc_org_prefix}:${id} vcard:tel "tel:${phone.getValue}" . """).append("\n")
+ }
+ )
+ if( root.getDetails.getUrls != null && !root.getDetails.getUrls.isEmpty)
+ root.getDetails.getUrls.foreach (
+ url => {
+ sb.append(s"""${fc_org_prefix}:${id} vcard:url "${url.getUrl}" . """).append("\n")
+ }
+ )
+ sb.toString
+ }
+
+ def safe_companySummaryAsTurtle(profile: CompanySummary): Option[String] = {
+ Try(companySummaryAsTurtle(profile)).toOption
+ }
+
+ def normalize_age_range(ageRange: String) : Option[Int] = {
+ val avg = Try {
+ val seq = ageRange.split("-").map(_.toLong).toSeq
+ (seq.sum / seq.length).toInt
+ }.toOption
+ avg
+ }
+
+ def callEnrichPerson(request : EnrichPersonRequest)(implicit config : FullContactConfiguration) : String = {
+ import java.net.URI
+
+ import org.apache.http.client.utils.URIBuilder
+ import org.apache.juneau.json.JsonParser
+ import org.apache.juneau.json.JsonSerializer
+ import org.apache.juneau.rest.client.RestCall
+ import org.apache.juneau.rest.client.RestClient
+ val auth_header = s"Bearer ${config.getToken()}"
+ val url = "https://api.fullcontact.com/v3/person.enrich"
+ val uri : URI = new URIBuilder(url).build()
+ lazy val parser = JsonParser.create().
+ debug().
+ ignoreUnknownBeanProperties(true).
+ ignorePropertiesWithoutSetters(true).
+ build()
+ lazy val serializer = JsonSerializer.create().
+ debug().
+ trimEmptyCollections(true).
+ trimNullProperties(true).
+ trimEmptyMaps(true).
+ build()
+ val restClientBuilder = RestClient.
+ create().
+ authorization(auth_header).
+ beansRequireSerializable(true).
+ debug().
+ disableAutomaticRetries().
+ disableCookieManagement().
+ disableRedirectHandling().
+ json().
+ parser(parser).
+ rootUrl(uri).
+ serializer(serializer)
+
+ val post : RestCall = restClientBuilder.
+ build().
+ doPost(uri).
+ body(request)
+
+ Thread.sleep(1000)
+
+ post.getResponseAsString
+ }
+
+ def safeCallEnrichPerson(request : EnrichPersonRequest) : Option[String] = {
+ Try(callEnrichPerson(request)).toOption
+ }
+
+ def parseEnrichPersonResponse(response : String): PersonSummary = {
+ import org.apache.juneau.json.JsonParser
+ val result = JsonParser.DEFAULT.parse(response, classOf[PersonSummary])
+ result
+ }
+
+ def safeParseEnrichPersonResponse(response : String): Option[PersonSummary] = {
+ Try(parseEnrichPersonResponse(response)).toOption
+ }
+
+ def educationItem(row : ObjectMap) : Option[String] = Try(f"""${row.getString("name","")}%s ${row.getString("degree","")}%s (${row.getAt("start/year",classOf[String])}%s - ${row.getAt("end/year",classOf[String])}%s)""").toOption
+ def educationSummary(json : String) : List[String] = {
+ val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList])
+ val summaries = Try(list.elements(classOf[ObjectMap]).flatMap( row => educationItem(row) ))
+ if( summaries.isFailure || summaries.get.size == 0 ) return List() else return summaries.get.toList
+ }
+
+ def employmentItem(row : ObjectMap) : Option[String] = Try(f"""${row.getString("name","")}%s ${row.getString("title","")}%s (${row.getAt("start/year",classOf[String])}%s - ${row.getAt("end/year",classOf[String])})""").toOption
+ def employmentSummary(json : String) : List[String] = {
+ val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList])
+ val summaries = Try(list.elements(classOf[ObjectMap]).flatMap( row => employmentItem(row)))
+ if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList
+ }
+
+ def interestItem(row : ObjectMap) : Option[String] = Try(f"""${row.getString("name","")}%s (${row.getString("affinity","")}%s)""").toOption
+ def interestSummary(json : String) : List[String] = {
+ val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList])
+ val summaries = Try(list.elements(classOf[ObjectMap]).toSeq.sorted(AffinityOrdering).flatMap( row => interestItem(row)))
+ if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList
+ }
+
+ def profileItem(row : ObjectMap) : Option[String] = Try(f"""${row.get("url")}%s""").toOption
+ def profileSummary(json : String) : List[String] = {
+ val map = JsonParser.DEFAULT.parse(json, classOf[ObjectMap])
+ val summaries = Try(map.entrySet().flatMap( row => profileItem(row.getValue().asInstanceOf[ObjectMap])).toSeq.sorted)
+ if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList
+ }
+
+ def urlItem(row : ObjectMap) : Option[String] = Try(f"""${row.get("value")}%s""").toOption
+ def urlSummary(json : String) : List[String] = {
+ val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList])
+ val summaries = Try(list.elements(classOf[ObjectMap]).flatMap( row => urlItem(row)).toSeq.sorted)
+ if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList
+ }
+
+}